From 1210085dac6ec16540506d54e81b6947b6647ffa Mon Sep 17 00:00:00 2001 From: Paho Lurie-Gregg Date: Thu, 21 Sep 2023 09:01:44 -0700 Subject: [PATCH] Provide a new garbage-collection mechanism We introduce the configuration `min_available_memory` which causes disconnected clients to be cleaned up when the available system memory drops below this amount. While this is compatible with the `instance_timeout` setting, it provides an alternative; letting `rust-analyzer` processes live forever unless memory gets low. --- Cargo.lock | 116 +++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + README.md | 12 ++++- src/config.rs | 45 ++++++++++++---- src/server/instance.rs | 53 +++++++++++++++++-- 5 files changed, 214 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 845e953..626412b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,6 +113,16 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +[[package]] +name = "byte-unit" +version = "4.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da78b32057b8fdfc352504708feeba7216dcd65a2c9ab02978cbd288d1279b6c" +dependencies = [ + "serde", + "utf8-width", +] + [[package]] name = "bytes" version = "1.4.0" @@ -181,6 +191,45 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "core-foundation-sys" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" + +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "directories" version = "4.0.1" @@ -201,6 +250,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + [[package]] name = "env_logger" version = "0.10.0" @@ -321,6 +376,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -341,6 +405,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -418,6 +491,7 @@ name = "ra-multiplex" version = "0.2.2" dependencies = [ "anyhow", + "byte-unit", "clap", "directories", "env_logger", @@ -425,10 +499,31 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "sysinfo", "tokio", "toml", ] +[[package]] +name = "rayon" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -591,6 +686,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sysinfo" +version = "0.29.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a18d114d420ada3a891e6bc8e96a2023402203296a47cdd65083377dad18ba5" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "termcolor" version = "1.2.0" @@ -665,6 +775,12 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +[[package]] +name = "utf8-width" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5190c9442dcdaf0ddd50f37420417d219ae5261bbf5db120d0f9bab996c9cba1" + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index f9409ac..04ea8f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow = "1.0.53" +byte-unit = { version = "4", default-features = false, features = ["serde", "std"] } clap = { version = "4.3.0", features = ["derive", "env"] } directories = "4.0.1" env_logger = "0.10.0" @@ -15,5 +16,6 @@ log = "0.4.14" serde = { version = "1.0.186" } serde_derive = { version = "1.0.186" } serde_json = "1.0.78" +sysinfo = "0.29" tokio = { version = "1.15.0", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "sync", "time"] } toml = "0.5.8" diff --git a/README.md b/README.md index a833200..83f3c95 100644 --- a/README.md +++ b/README.md @@ -95,8 +95,18 @@ Example configuration file: # you can set this option to `false` for infinite timeout instance_timeout = 300 # after 5 minutes +# when available system memory drops below this amount, the oldest rust-analyzer +# server instance wih no clients will get killed immediately to save memory. +# +# if all server instances have connected clients, then nothing will happen. +# +# you can set this option to `false` to disable it, or set it to some value in +# bytes to enable it, such as "4 GB". +min_available_memory = false + # time in seconds how long to wait between the gc task checks for disconnected -# clients and possibly starts a timeout task. the value must be at least 1. +# clients and available system memory, and possibly starts a timeout task. the +# value must be at least 1. gc_interval = 10 # every 10 seconds # ip address and port on which ra-multiplex-server listens diff --git a/src/config.rs b/src/config.rs index 70792de..1c957e8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,10 @@ mod default { Some(5 * 60) } + pub fn min_available_memory() -> Option { + None + } + pub fn gc_interval() -> u32 { // 10 seconds 10 @@ -41,28 +45,44 @@ mod default { mod de { use super::*; + #[derive(Deserialize)] + #[serde(untagged)] + enum OneOf { + Bool(bool), + Value(T), + } + /// parse either bool(false) or u32 pub fn instance_timeout<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, { - #[derive(Deserialize)] - #[serde(untagged)] - enum OneOf { - Bool(bool), - U32(u32), - } - match OneOf::deserialize(deserializer) { - Ok(OneOf::U32(value)) => Ok(Some(value)), + Ok(OneOf::Value(value)) => Ok(Some(value)), Ok(OneOf::Bool(false)) => Ok(None), Ok(OneOf::Bool(true)) => Err(Error::invalid_value( Unexpected::Bool(true), &"a non-negative integer or false", )), - Err(_) => Err(Error::custom( - "invalid type: expected a non-negative integer or false", + Err(_) => Err(Error::custom("invalid type: expected a non-negative integer or false")), + } + } + + /// parse either bool(false) or a number in bytes + pub fn min_available_memory<'de, D>( + deserializer: D, + ) -> Result, D::Error> + where + D: Deserializer<'de>, + { + match OneOf::deserialize(deserializer) { + Ok(OneOf::Value(value)) => Ok(Some(value)), + Ok(OneOf::Bool(false)) => Ok(None), + Ok(OneOf::Bool(true)) => Err(Error::invalid_value( + Unexpected::Bool(true), + &"a value in bytes (e.g. '100 MB') or false", )), + Err(_) => Err(Error::custom("invalid type: expected a value in bytes (e.g. '100 MB') or false")), } } @@ -88,6 +108,10 @@ pub struct Config { #[serde(deserialize_with = "de::instance_timeout")] pub instance_timeout: Option, + #[serde(default = "default::min_available_memory")] + #[serde(deserialize_with = "de::min_available_memory")] + pub min_available_memory: Option, + #[serde(default = "default::gc_interval")] #[serde(deserialize_with = "de::gc_interval")] pub gc_interval: u32, @@ -129,6 +153,7 @@ impl Config { connect: default::connect(), log_filters: default::log_filters(), workspace_detection: default::workspace_detection(), + min_available_memory: default::min_available_memory(), } } diff --git a/src/server/instance.rs b/src/server/instance.rs index 745f3ae..e8434d3 100644 --- a/src/server/instance.rs +++ b/src/server/instance.rs @@ -18,9 +18,13 @@ use std::str::{self, FromStr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::io::{AsyncBufReadExt, BufReader}; +use sysinfo::SystemExt; use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; use tokio::sync::{mpsc, Mutex, Notify, RwLock}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + time::Instant, +}; use tokio::{select, task, time}; /// keeps track of the initialize/initialized handshake for an instance @@ -117,6 +121,7 @@ impl InstanceKey { pub struct RaInstance { pid: u32, + spawned_at: Instant, /// wakes up the wait_task and asks it to send SIGKILL to the instance close: Notify, /// make sure only one timeout_task is running for an instance @@ -170,15 +175,21 @@ impl InstanceRegistry { async fn spawn_gc_task(&self) { let config = Config::load_or_default().await; let gc_interval = config.gc_interval.into(); - let instance_timeout = config.instance_timeout.map(<_>::from); + let instance_timeout = config.instance_timeout.map(u64::from); + let min_available_memory = config.min_available_memory.map(u64::from); let registry = self.clone(); let mut closed_ports = Vec::new(); + let mut sysinfo = sysinfo::System::new(); + task::spawn(async move { loop { time::sleep(Duration::from_secs(gc_interval)).await; - for (key, instance) in registry.map.lock().await.iter() { + let map = registry.map.lock().await; + + // Clean-up message_readers + for instance in map.values() { for (port, sender) in instance.message_readers.read().await.iter() { if sender.is_closed() { closed_ports.push(*port); @@ -188,7 +199,42 @@ impl InstanceRegistry { for port in closed_ports.drain(..) { message_readers.remove(&port); } + } + + // First check if we need to immediately kill any instances. + if let Some(min_available) = min_available_memory { + sysinfo.refresh_memory(); + let available_memory = sysinfo.available_memory(); + + if available_memory < min_available { + let mut oldest = None; + + for instance in map.values() { + if instance.message_readers.read().await.is_empty() { + oldest = match oldest { + None => Some(instance), + Some(old) => { + Some(std::cmp::min_by_key(old, instance, |i| i.spawned_at)) + } + }; + } + } + + if let Some(oldest) = oldest { + let pid = oldest.pid; + let path = oldest.key.workspace_root.display(); + log::warn!("[{path} {pid}] system low on memory, stopping now)"); + oldest.close.notify_one(); + } else { + log::debug!("low on memory, no instance without connections to kill"); + } + } + } + + // Now check for instances that need a timeout. + for (key, instance) in map.iter() { if let Some(instance_timeout) = instance_timeout { + let message_readers = instance.message_readers.read().await; if message_readers.is_empty() && instance.attempt_start_timeout() { registry.spawn_timeout_task(key, instance_timeout); let pid = instance.pid; @@ -261,6 +307,7 @@ impl RaInstance { let instance = Arc::new(RaInstance { pid, + spawned_at: Instant::now(), close: Notify::new(), timeout_running: AtomicBool::new(false), key: key.to_owned(),