Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Threads inside of Partitions instead of Processes #107

Merged
merged 9 commits into from
Feb 29, 2024
265 changes: 245 additions & 20 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ procfs = "0.16"
polling = "3.4"
itertools = "0.12.1"
once_cell = "1.19"
bytesize = "1.1"
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ serde = { version = "1.0", features = ["derive"] }
memfd = "0.6"
bincode = "1.3"
thiserror = "1.0"
bytesize = {version = "1.1.0", features = ["serde"]}
bytesize = {workspace = true, features = ["serde"]}
byteorder = "1.4.3"
enum_primitive = "0.1"

Expand Down
59 changes: 51 additions & 8 deletions core/src/cgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,15 @@ impl CGroup {
Self::new_root(&self.path, name)
}

/// Creates a threaded sub-cgroup inside this one
pub fn new_threaded(&self, name: &str) -> anyhow::Result<Self> {
let cgroup = Self::new_root(&self.path, name)?;
cgroup.set_threaded()?;
Ok(cgroup)
}

/// Moves a process to this cgroup
pub fn mv(&self, pid: Pid) -> anyhow::Result<()> {
pub fn mv_proc(&self, pid: Pid) -> anyhow::Result<()> {
trace!("Move {pid:?} to {}", self.get_path().display());
if !is_cgroup(&self.path)? {
bail!("{} is not a valid cgroup", self.path.display());
Expand All @@ -83,6 +90,28 @@ impl CGroup {
Ok(())
}

/// Moves a thread to this cgroup
pub fn mv_thread(&self, pid: Pid) -> anyhow::Result<()> {
trace!("Move {pid:?} to {}", self.get_path().display());
if !is_cgroup(&self.path)? {
bail!("{} is not a valid cgroup", self.path.display());
}

fs::write(self.path.join("cgroup.threads"), pid.to_string())?;
Ok(())
}

/// Changes the cgroups type to "threaded"
fn set_threaded(&self) -> anyhow::Result<()> {
trace!("Change type of {} to threaded", self.get_path().display());
if !is_cgroup(&self.path)? {
bail!("{} is not a valid cgroup", self.path.display());
}

fs::write(self.path.join("cgroup.type"), "threaded")?;
Ok(())
}

/// Returns all PIDs associated with this cgroup
pub fn get_pids(&self) -> anyhow::Result<Vec<Pid>> {
if !is_cgroup(&self.path)? {
Expand All @@ -97,6 +126,20 @@ impl CGroup {
Ok(pids)
}

/// Returns all TIDs associated with this cgroup
pub fn get_tids(&self) -> anyhow::Result<Vec<Pid>> {
if !is_cgroup(&self.path)? {
bail!("{} is not a valid cgroup", self.path.display());
}

let pids: Vec<Pid> = fs::read(self.path.join("cgroup.threads"))?
.lines()
.map(|line| Pid::from_raw(line.unwrap().parse().unwrap()))
.collect();

Ok(pids)
}

/// Checks whether this cgroup is populated
pub fn populated(&self) -> anyhow::Result<bool> {
if !is_cgroup(&self.path)? {
Expand Down Expand Up @@ -323,8 +366,8 @@ mod tests {
let cg1 = CGroup::new_root(get_path(), &gen_name()).unwrap();
let cg2 = cg1.new(&gen_name()).unwrap();

cg1.mv(pid).unwrap();
cg2.mv(pid).unwrap();
cg1.mv_proc(pid).unwrap();
cg2.mv_proc(pid).unwrap();
proc.kill().unwrap();

cg1.rm().unwrap();
Expand All @@ -341,14 +384,14 @@ mod tests {
assert!(cg1.get_pids().unwrap().is_empty());
assert!(cg2.get_pids().unwrap().is_empty());

cg1.mv(pid).unwrap();
cg1.mv_proc(pid).unwrap();
let pids = cg1.get_pids().unwrap();
assert!(!pids.is_empty());
assert!(cg2.get_pids().unwrap().is_empty());
assert_eq!(pids.len(), 1);
assert_eq!(pids[0], pid);

cg2.mv(pid).unwrap();
cg2.mv_proc(pid).unwrap();
let pids = cg2.get_pids().unwrap();
assert!(!pids.is_empty());
assert!(cg1.get_pids().unwrap().is_empty());
Expand All @@ -369,7 +412,7 @@ mod tests {
assert!(!cg.populated().unwrap());
assert_eq!(cg.populated().unwrap(), !cg.get_pids().unwrap().is_empty());

cg.mv(pid).unwrap();
cg.mv_proc(pid).unwrap();
assert!(cg.populated().unwrap());
assert_eq!(cg.populated().unwrap(), !cg.get_pids().unwrap().is_empty());

Expand All @@ -394,7 +437,7 @@ mod tests {
assert!(!cg.frozen().unwrap());

// Do the same with a non-empty cgroup
cg.mv(pid).unwrap();
cg.mv_proc(pid).unwrap();
cg.freeze().unwrap();
assert!(cg.frozen().unwrap());
cg.unfreeze().unwrap();
Expand All @@ -415,7 +458,7 @@ mod tests {
cg.kill().unwrap();

// Do the same with a non-empty cgroup
cg.mv(pid).unwrap();
cg.mv_proc(pid).unwrap();
assert!(cg.populated().unwrap());
cg.kill().unwrap();

Expand Down
46 changes: 29 additions & 17 deletions core/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::marker::PhantomData;
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use std::os::unix::net::UnixDatagram;
use std::os::unix::prelude::{AsRawFd, FromRawFd, RawFd};
use std::path::Path;
use std::time::Duration;

use anyhow::Error;
Expand Down Expand Up @@ -90,24 +91,17 @@ where
}
}

/// Create a pair consisting of an IpcSender and an IpcReceiver
pub fn channel_pair<T>() -> TypedResult<(IpcSender<T>, IpcReceiver<T>)>
where
T: for<'de> Deserialize<'de> + Serialize,
{
trace!("Create IPC channel pair");
let (tx, rx) = socketpair(
AddressFamily::Unix,
SockType::Datagram,
None,
SockFlag::SOCK_NONBLOCK,
)
.typ(SystemError::Panic)?;

let tx = IpcSender::from(tx);
let rx = IpcReceiver::from(rx);
pub fn bind_receiver<T>(path: &Path) -> TypedResult<IpcReceiver<T>> {
let socket = UnixDatagram::bind(path).typ(SystemError::Panic)?;
socket.set_nonblocking(true).typ(SystemError::Panic)?;
Ok(IpcReceiver::from(socket))
}

Ok((tx, rx))
pub fn connect_sender<T>(path: &Path) -> TypedResult<IpcSender<T>> {
let socket = UnixDatagram::unbound().typ(SystemError::Panic)?;
socket.connect(path).typ(SystemError::Panic)?;
socket.set_nonblocking(true).typ(SystemError::Panic)?;
Ok(IpcSender::from(socket))
}

impl<T> AsRawFd for IpcSender<T> {
Expand All @@ -134,6 +128,24 @@ impl<T> AsFd for IpcReceiver<T> {
}
}

impl<T> From<UnixDatagram> for IpcReceiver<T> {
fn from(value: UnixDatagram) -> Self {
Self {
socket: value,
_p: PhantomData,
}
}
}

impl<T> From<UnixDatagram> for IpcSender<T> {
fn from(value: UnixDatagram) -> Self {
Self {
socket: value,
_p: PhantomData,
}
}
}

impl<T> From<OwnedFd> for IpcSender<T> {
fn from(value: OwnedFd) -> Self {
Self {
Expand Down
4 changes: 3 additions & 1 deletion core/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub struct PartitionConstants {
pub period: Duration,
pub duration: Duration,
pub start_condition: StartCondition,
pub sender_fd: RawFd,
pub start_time_fd: RawFd,
pub partition_mode_fd: RawFd,

Expand All @@ -37,8 +36,11 @@ pub struct SamplingConstant {

impl PartitionConstants {
pub const PARTITION_CONSTANTS_FD: &'static str = "PARTITION_CONSTANTS_FD";
pub const PROCESSES_CGROUP: &'static str = "processes";
pub const MAIN_PROCESS_CGROUP: &'static str = "main";
pub const APERIODIC_PROCESS_CGROUP: &'static str = "aperiodic";
pub const PERIODIC_PROCESS_CGROUP: &'static str = "periodic";
pub const IPC_SENDER: &'static str = "/.inner/ipc";

pub fn open() -> TypedResult<Self> {
let fd = std::env::var(Self::PARTITION_CONSTANTS_FD)
Expand Down
56 changes: 28 additions & 28 deletions core/src/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,21 @@ pub enum ApexSyscall {
}

#[derive(Debug, PartialEq)]
pub struct SyscallRequ {
pub struct SyscallRequest {
pub id: ApexSyscall,
pub params: Vec<u64>,
}

#[derive(Debug, PartialEq)]
pub struct SyscallResp {
pub struct SyscallResponse {
pub id: ApexSyscall,
pub status: u64,
}

impl SyscallRequ {
/// Serializes a SyscallRequ into its binary representation
impl SyscallRequest {
/// Serializes a SyscallRequest into its binary representation
///
/// The format for serializing a SyscallRequ is defined as follows:
/// The format for serializing a SyscallRequest is defined as follows:
/// ```text
/// id [u64]
/// nparams [u8]
Expand All @@ -212,7 +212,7 @@ impl SyscallRequ {
Ok(serialized)
}

/// Deserializes a serialized SyscallRequ back into its internal
/// Deserializes a serialized SyscallRequest back into its internal
/// representation
pub fn deserialize(serialized: &Vec<u8>) -> Result<Self> {
let mut serialized: &[u8] = serialized;
Expand All @@ -226,14 +226,14 @@ impl SyscallRequ {
params.push(serialized.read_u64::<NativeEndian>()?);
}

Ok(SyscallRequ { id, params })
Ok(SyscallRequest { id, params })
}
}

impl SyscallResp {
/// Serializes a SyscallResp into its binary representation
impl SyscallResponse {
/// Serializes a SyscallResponse into its binary representation
///
/// The format for serializing a SyscallResp is defined as follows:
/// The format for serializing a SyscallResponse is defined as follows:
/// ```text
/// id [u64]
/// status [u64]
Expand All @@ -248,7 +248,7 @@ impl SyscallResp {
Ok(serialized)
}

/// Deserializes a serialized SyscallResp back into its internal
/// Deserializes a serialized SyscallResponse back into its internal
/// representation
pub fn deserialize(serialized: &Vec<u8>) -> Result<Self> {
let mut serialized: &[u8] = serialized;
Expand All @@ -257,7 +257,7 @@ impl SyscallResp {
.ok_or(anyhow!("deserialization of ApexSyscall failed"))?;
let status = serialized.read_u64::<NativeEndian>()?;

Ok(SyscallResp { id, status })
Ok(SyscallResponse { id, status })
}
}

Expand All @@ -266,12 +266,12 @@ mod tests {
use super::*;

#[test]
fn test_serialize_requ() {
let requ = SyscallRequ {
fn test_serialize_request() {
let request = SyscallRequest {
id: ApexSyscall::Start,
params: vec![1, 2, 3],
};
let serialized = requ.serialize().unwrap();
let serialized = request.serialize().unwrap();
let mut serialized: &[u8] = &serialized;

let id = serialized.read_u64::<NativeEndian>().unwrap();
Expand All @@ -290,12 +290,12 @@ mod tests {
}

#[test]
fn test_serialize_resp() {
let resp = SyscallResp {
fn test_serialize_response() {
let response = SyscallResponse {
id: ApexSyscall::Start,
status: 42,
};
let serialized = resp.serialize().unwrap();
let serialized = response.serialize().unwrap();
let mut serialized: &[u8] = &serialized;

let id = serialized.read_u64::<NativeEndian>().unwrap();
Expand All @@ -307,26 +307,26 @@ mod tests {
}

#[test]
fn test_deserialize_requ() {
let requ = SyscallRequ {
fn test_deserialize_request() {
let request = SyscallRequest {
id: ApexSyscall::Start,
params: vec![1, 2, 3],
};
let serialized = requ.serialize().unwrap();
let deserialized = SyscallRequ::deserialize(&serialized).unwrap();
assert_eq!(requ, deserialized);
let serialized = request.serialize().unwrap();
let deserialized = SyscallRequest::deserialize(&serialized).unwrap();
assert_eq!(request, deserialized);
assert!(!serialized.is_empty());
}

#[test]
fn test_deserialize_resp() {
let resp = SyscallResp {
fn test_deserialize_response() {
let response = SyscallResponse {
id: ApexSyscall::Start,
status: 42,
};
let serialized = resp.serialize().unwrap();
let deserialized = SyscallResp::deserialize(&serialized).unwrap();
assert_eq!(resp, deserialized);
let serialized = response.serialize().unwrap();
let deserialized = SyscallResponse::deserialize(&serialized).unwrap();
assert_eq!(response, deserialized);
assert!(!serialized.is_empty());
}
}
1 change: 1 addition & 0 deletions hypervisor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ procfs.workspace = true
polling.workspace = true
itertools.workspace = true
once_cell.workspace = true
bytesize.workspace = true
anyhow = "1.0"
tempfile = "3.3"
clone3 = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions hypervisor/src/hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Hypervisor {

pub fn run(mut self) -> LeveledResult<()> {
self.cg
.mv(nix::unistd::getpid())
.mv_proc(nix::unistd::getpid())
.typ(SystemError::CGroup)
.lev(ErrorLevel::ModuleInit)?;

Expand Down Expand Up @@ -178,7 +178,7 @@ impl Drop for Hypervisor {
// requires that the cgroup must have been deleted externally
if let Err(e) = CGroup::import_root(&self.prev_cg)
.unwrap()
.mv(nix::unistd::getpid())
.mv_proc(nix::unistd::getpid())
{
error!("{e}")
}
Expand Down
Loading
Loading