Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/thread handling controller internal api #51

Open
wants to merge 3 commits into
base: team/cluster-controller
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions controller/lib/src/internal_api/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,17 @@ use tonic::transport::Server;
pub struct InternalAPIInterface {}

impl InternalAPIInterface {
pub async fn new(address: SocketAddr, num_workers: usize) -> Self {
info!(
"Starting {} gRPC worker(s) listening on {}",
num_workers, address
);
pub async fn new(address: SocketAddr) -> Self {
info!("Starting gRPC server listening on {}", address);

tokio::spawn(async move {
Server::builder()
.add_service(NodeServiceServer::new(NodeController::default()))
.serve(address)
.await
.unwrap();
});

for _ in 1..num_workers {
tokio::spawn(async move {
Server::builder()
.add_service(NodeServiceServer::new(NodeController::default()))
.serve(address)
.await
.unwrap();
});
}
Self {}
}
}
2 changes: 0 additions & 2 deletions controller/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub struct KudoControllerConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct InternalAPIConfig {
pub grpc_server_addr: SocketAddr,
pub grpc_server_num_workers: usize,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand All @@ -27,7 +26,6 @@ impl Default for KudoControllerConfig {
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
50051,
),
grpc_server_num_workers: 1,
},
external_api: ExternalAPIConfig {
http_server_addr: SocketAddr::new(
Expand Down
6 changes: 1 addition & 5 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let config: config::KudoControllerConfig = confy::load_path("controller.conf")?;

// gRPC Server
internal_api::interface::InternalAPIInterface::new(
config.internal_api.grpc_server_addr,
config.internal_api.grpc_server_num_workers,
)
.await;
internal_api::interface::InternalAPIInterface::new(config.internal_api.grpc_server_addr).await;

// HTTP Server
external_api::interface::ExternalAPIInterface::new(
Expand Down
7 changes: 3 additions & 4 deletions kudoctl/src/subcommands/get/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod instance;
mod instances;
mod output;
mod resource;
mod resources;
use self::output::OutputFormat;
use crate::config;
use anyhow::{bail, Result};
use anyhow::Result;
use clap::{Args, ValueEnum};

#[derive(Debug, Args)]
Expand Down Expand Up @@ -48,10 +49,8 @@ pub async fn execute(args: GetSubcommand, conf: &config::Config) -> Result<Strin

match args.subject {
GetSubjects::Resources => resources::execute(conf, format, show_header).await,
GetSubjects::Resource => resource::execute(conf, format, args.id).await,
GetSubjects::Instances => instances::execute(conf, format, show_header).await,
GetSubjects::Instance => instance::execute(conf, format, args.id).await,
GetSubjects::Resource => {
bail!(format!("{:?} not implemented yet", args.subject))
}
}
}
63 changes: 63 additions & 0 deletions kudoctl/src/subcommands/get/resource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use super::output::{self, OutputFormat};
use crate::{
client::{self, request::Client},
config,
resource::workload::Workload,
};
use anyhow::{bail, Context, Result};
use std::fmt::Display;

/// get workload <id> subcommand execution
/// Does the request, then formats the output.
pub async fn execute(
conf: &config::Config,
format: OutputFormat,
search: Option<String>,
) -> Result<String> {
if search.is_none() {
bail!("You must provide an instance id");
}
let search = search.unwrap();

let client = Client::new(conf).context("Error creating client")?;
let result = client::workload::get(&client, search.as_str()).await?;

output::format_output(result, format)
}

impl Display for Workload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "name : {}\n", self.name)?;
writeln!(f, "uri : {}\n", self.uri)?;

if let Some(ports) = &self.ports {
// display ports
let ports_str = ports
.iter()
.fold(String::new(), |acc, port| acc + &format!("{} ", port))
.trim()
.replace(' ', ",")
.replace(':', "->");
writeln!(f, "ports : {} ", ports_str)?;
}

if let Some(env) = &self.env {
// display environment variables
let env_vars_str = env
.iter()
.fold(String::new(), |acc, env_var| acc + &format!("{} ", env_var))
.trim()
.replace(' ', ",");
writeln!(f, "env variables : {} ", env_vars_str)?;
}

// display resources

writeln!(
f,
"resources : {}milliCPU, {}mB memory, {}GB disk ",
self.resources.cpu, self.resources.memory, self.resources.disk
)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions node-agent/node_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
sysinfo = "0.24.6"
log = "0.4.0"
186 changes: 184 additions & 2 deletions node-agent/node_manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,185 @@
fn main() {
println!("Hello, world!");
use std::{thread::sleep, time::Duration};

use log::debug;
use sysinfo::{CpuExt, DiskExt, System, SystemExt};

const KB_TO_MB: u64 = 1000;
const BIT_TO_GB: u64 = 1000000000;

pub struct NodeSystem {
sys: System,
}

impl NodeSystem {
// used in all functions to access system
pub fn new() -> Self {
let mut sys = System::new_all();

NodeSystem { sys }
}

fn refresh_system(&mut self) {
self.sys.refresh_all();
}

// ------ CPU ------

/*
Returns the CPU limit (in MilliCPU)
*/
pub fn total_cpu(&mut self) -> u64 {
self.refresh_system();

let nb_of_core: u64 = self.sys.cpus().len() as u64;

let amount_of_milli_cpu: u64 = nb_of_core * 1000;
debug!("total cpu: {:?} MilliCPU", &amount_of_milli_cpu);

amount_of_milli_cpu
}

/*
Returns the CPU usage (in MilliCPU)
*/
pub fn used_cpu(&mut self) -> u64 {
// we need to refresh the cpus at least 2 times (with 200 miliseconds interval) to get an accurate cpu usage
for _ in 0..3 {
self.sys.refresh_cpu();
sleep(Duration::from_millis(200));
}

let amount_of_milli_cpu: u64 = Self::total_cpu(self);
let cpu_usage_in_pourcent: u64 = self.sys.global_cpu_info().cpu_usage().ceil() as u64;

let cpu_usage: u64 = amount_of_milli_cpu * cpu_usage_in_pourcent / 100;
debug!("used cpu: {:?} MilliCPU", &cpu_usage);

cpu_usage
}

// ------ MEMORY ------

/*
Returns the limit memory space (in MB)
*/
pub fn total_memory(&mut self) -> u64 {
self.refresh_system();

let total_memory: u64 = (self.sys.available_memory() + self.sys.used_memory()) / KB_TO_MB;
debug!("total memory: {:?} MB", &total_memory);

total_memory
}

/*
Returns the used memory space (in MB)
*/
pub fn used_memory(&mut self) -> u64 {
self.refresh_system();

let used_memory: u64 = self.sys.used_memory() / KB_TO_MB;
debug!("used memory: {} MB", &used_memory);

used_memory
}

// ------ DISK ------

/*
Returns the limit space of the main disk (in GB)
*/
pub fn total_disk(&mut self) -> u64 {
self.refresh_system();

let mut main_disk_total_space: u64 = 0;

for disk in self.sys.disks() {
let current_total_space: u64 = disk.total_space();

if current_total_space > main_disk_total_space {
main_disk_total_space = current_total_space;
}
}

main_disk_total_space = main_disk_total_space / BIT_TO_GB;

debug!("total disk space: {} GB", main_disk_total_space);

main_disk_total_space
}

/*
Returns the used space of the main disk (in GB)
*/
pub fn used_disk(&mut self) -> u64 {
self.refresh_system();

// get used disk from main disk
let mut main_disk_availability: u64 = 0;
let mut main_disk_total_space: u64 = 0;

for disk in self.sys.disks() {
let current_available_space: u64 = disk.available_space();
let current_total_space: u64 = disk.total_space();

if current_available_space > main_disk_availability {
main_disk_availability = current_available_space;
}

if current_total_space > main_disk_total_space {
main_disk_total_space = current_total_space;
}
}

let used_disk: u64 = (main_disk_total_space - main_disk_availability) / BIT_TO_GB;
debug!("used disk: {} GB", used_disk);

used_disk
}
}

#[cfg(test)]
mod tests {
use super::*;

fn init_system() -> NodeSystem {
NodeSystem::new()
}

#[test]
fn test_total_cpu() {
let mut sys = init_system();

assert!(sys.total_cpu() >= 1000) // minimum 1 core
}

#[test]
fn test_used_cpu() {
let mut sys = init_system();

let total_cpu = sys.total_cpu();
let used_cpu = sys.used_cpu();

assert!(used_cpu <= total_cpu)
}

#[test]
fn test_used_memory() {
let mut sys = init_system();

let total_memory = sys.total_memory();
let used_memory = sys.used_memory();

assert!(used_memory <= total_memory)
}

#[test]
fn test_used_disk() {
let mut sys = init_system();

let total_disk = sys.total_disk();
let used_disk = sys.used_disk();

assert!(used_disk <= total_disk)
}
}
Empty file.