From 09d88aa44b2d5290068590d99290ef39858e0c7a Mon Sep 17 00:00:00 2001 From: jokemanfire Date: Wed, 8 Jan 2025 15:04:56 +0800 Subject: [PATCH] OpenOptions in blocking mod is a time-consuming operation. Prevent it from blocking a tokio thread. Change sync to async. Signed-off-by: jokemanfire --- crates/runc/src/asynchronous/io.rs | 216 ++++++++++++++++++++++++ crates/runc/src/asynchronous/mod.rs | 21 +-- crates/runc/src/asynchronous/pipe.rs | 69 ++++++++ crates/runc/src/lib.rs | 21 ++- crates/runc/src/{ => synchronous}/io.rs | 3 +- crates/runc/src/synchronous/mod.rs | 2 +- 6 files changed, 308 insertions(+), 24 deletions(-) create mode 100644 crates/runc/src/asynchronous/io.rs rename crates/runc/src/{ => synchronous}/io.rs (99%) diff --git a/crates/runc/src/asynchronous/io.rs b/crates/runc/src/asynchronous/io.rs new file mode 100644 index 00000000..c9969bdc --- /dev/null +++ b/crates/runc/src/asynchronous/io.rs @@ -0,0 +1,216 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::{fmt::Debug, io::Result, os::unix::io::AsRawFd, process::Stdio}; + +use async_trait::async_trait; +use nix::unistd::{Gid, Uid}; +use tokio::fs::OpenOptions; + +pub use crate::Io; +use crate::{Command, Pipe, PipedIo}; + +#[derive(Debug, Clone)] +pub struct IOOption { + pub open_stdin: bool, + pub open_stdout: bool, + pub open_stderr: bool, +} + +impl Default for IOOption { + fn default() -> Self { + Self { + open_stdin: true, + open_stdout: true, + open_stderr: true, + } + } +} + +impl PipedIo { + pub fn new(uid: u32, gid: u32, opts: &IOOption) -> std::io::Result { + Ok(Self { + stdin: Self::create_pipe(uid, gid, opts.open_stdin, true)?, + stdout: Self::create_pipe(uid, gid, opts.open_stdout, false)?, + stderr: Self::create_pipe(uid, gid, opts.open_stderr, false)?, + }) + } + + fn create_pipe( + uid: u32, + gid: u32, + enabled: bool, + stdin: bool, + ) -> std::io::Result> { + if !enabled { + return Ok(None); + } + + let pipe = Pipe::new()?; + let uid = Some(Uid::from_raw(uid)); + let gid = Some(Gid::from_raw(gid)); + if stdin { + let rd = pipe.rd.try_clone()?; + nix::unistd::fchown(rd.as_raw_fd(), uid, gid)?; + } else { + let wr = pipe.wr.try_clone()?; + nix::unistd::fchown(wr.as_raw_fd(), uid, gid)?; + } + Ok(Some(pipe)) + } +} + +/// IO driver to direct output/error messages to /dev/null. +/// +/// With this Io driver, all methods of [crate::Runc] can't capture the output/error messages. +#[derive(Debug)] +pub struct NullIo { + dev_null: std::sync::Mutex>, +} + +impl NullIo { + pub fn new() -> std::io::Result { + let f = std::fs::OpenOptions::new().read(true).open("/dev/null")?; + let dev_null = std::sync::Mutex::new(Some(f)); + Ok(Self { dev_null }) + } +} + +#[async_trait] +impl Io for NullIo { + async fn set(&self, cmd: &mut Command) -> std::io::Result<()> { + if let Some(null) = self.dev_null.lock().unwrap().as_ref() { + cmd.stdout(null.try_clone()?); + cmd.stderr(null.try_clone()?); + } + Ok(()) + } + + async fn close_after_start(&self) { + let mut m = self.dev_null.lock().unwrap(); + let _ = m.take(); + } +} + +/// Io driver based on Stdio::inherited(), to direct outputs/errors to stdio. +/// +/// With this Io driver, all methods of [crate::Runc] can't capture the output/error messages. +#[derive(Debug)] +pub struct InheritedStdIo {} + +impl InheritedStdIo { + pub fn new() -> std::io::Result { + Ok(InheritedStdIo {}) + } +} + +#[async_trait] +impl Io for InheritedStdIo { + async fn set(&self, cmd: &mut Command) -> std::io::Result<()> { + cmd.stdin(Stdio::null()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + Ok(()) + } + + async fn close_after_start(&self) {} +} + +/// Io driver based on Stdio::piped(), to capture outputs/errors from runC. +/// +/// With this Io driver, methods of [crate::Runc] may capture the output/error messages. +#[derive(Debug)] +pub struct PipedStdIo {} + +impl PipedStdIo { + pub fn new() -> std::io::Result { + Ok(PipedStdIo {}) + } +} +#[async_trait] +impl Io for PipedStdIo { + async fn set(&self, cmd: &mut Command) -> std::io::Result<()> { + cmd.stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + Ok(()) + } + + async fn close_after_start(&self) {} +} + +/// FIFO for the scenario that set FIFO for command Io. +#[derive(Debug)] +pub struct FIFO { + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, +} +#[async_trait] +impl Io for FIFO { + async fn set(&self, cmd: &mut Command) -> Result<()> { + if let Some(path) = self.stdin.as_ref() { + let stdin = OpenOptions::new() + .read(true) + .custom_flags(libc::O_NONBLOCK) + .open(path) + .await?; + cmd.stdin(stdin.into_std().await); + } + + if let Some(path) = self.stdout.as_ref() { + let stdout = OpenOptions::new().write(true).open(path).await?; + cmd.stdout(stdout.into_std().await); + } + + if let Some(path) = self.stderr.as_ref() { + let stderr = OpenOptions::new().write(true).open(path).await?; + cmd.stderr(stderr.into_std().await); + } + + Ok(()) + } + + async fn close_after_start(&self) {} +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(not(target_os = "macos"))] + #[test] + fn test_io_option() { + let opts = IOOption { + open_stdin: false, + open_stdout: false, + open_stderr: false, + }; + let io = PipedIo::new(1000, 1000, &opts).unwrap(); + + assert!(io.stdin().is_none()); + assert!(io.stdout().is_none()); + assert!(io.stderr().is_none()); + } + + #[tokio::test] + async fn test_null_io() { + let io = NullIo::new().unwrap(); + assert!(io.stdin().is_none()); + assert!(io.stdout().is_none()); + assert!(io.stderr().is_none()); + } +} diff --git a/crates/runc/src/asynchronous/mod.rs b/crates/runc/src/asynchronous/mod.rs index ae7fa6ef..85bbf3ca 100644 --- a/crates/runc/src/asynchronous/mod.rs +++ b/crates/runc/src/asynchronous/mod.rs @@ -13,41 +13,36 @@ See the License for the specific language governing permissions and limitations under the License. */ - +pub mod io; mod pipe; use std::{fmt::Debug, io::Result, os::fd::AsRawFd}; +use async_trait::async_trait; use log::debug; pub use pipe::Pipe; use tokio::io::{AsyncRead, AsyncWrite}; use crate::Command; - +#[async_trait] pub trait Io: Debug + Send + Sync { - /// Return write side of stdin - #[cfg(feature = "async")] fn stdin(&self) -> Option> { None } - /// Return read side of stdout - #[cfg(feature = "async")] fn stdout(&self) -> Option> { None } - /// Return read side of stderr - #[cfg(feature = "async")] fn stderr(&self) -> Option> { None } /// Set IO for passed command. /// Read side of stdin, write side of stdout and write side of stderr should be provided to command. - fn set(&self, cmd: &mut Command) -> Result<()>; + async fn set(&self, cmd: &mut Command) -> Result<()>; /// Only close write side (should be stdout/err "from" runc process) - fn close_after_start(&self); + async fn close_after_start(&self); } #[derive(Debug)] @@ -56,7 +51,7 @@ pub struct PipedIo { pub stdout: Option, pub stderr: Option, } - +#[async_trait] impl Io for PipedIo { fn stdin(&self) -> Option> { self.stdin.as_ref().and_then(|pipe| { @@ -87,7 +82,7 @@ impl Io for PipedIo { // Note that this internally use [`std::fs::File`]'s `try_clone()`. // Thus, the files passed to commands will be not closed after command exit. - fn set(&self, cmd: &mut Command) -> std::io::Result<()> { + async fn set(&self, cmd: &mut Command) -> std::io::Result<()> { if let Some(p) = self.stdin.as_ref() { let pr = p.rd.try_clone()?; cmd.stdin(pr); @@ -106,7 +101,7 @@ impl Io for PipedIo { Ok(()) } - fn close_after_start(&self) { + async fn close_after_start(&self) { if let Some(p) = self.stdout.as_ref() { nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e)); } diff --git a/crates/runc/src/asynchronous/pipe.rs b/crates/runc/src/asynchronous/pipe.rs index 83a8cb79..23c2e238 100644 --- a/crates/runc/src/asynchronous/pipe.rs +++ b/crates/runc/src/asynchronous/pipe.rs @@ -36,3 +36,72 @@ impl Pipe { Ok(Self { rd, wr }) } } + +#[cfg(test)] +mod tests { + use std::{ + io::{Read, Write}, + os::{fd::FromRawFd, unix::io::AsRawFd}, + }; + + use tokio::runtime::Runtime; + + use super::*; + + #[test] + fn test_pipe_creation() { + let pipe = Pipe::new().expect("Failed to create pipe"); + assert!(pipe.rd.as_raw_fd() >= 0, "Read file descriptor is invalid"); + assert!(pipe.wr.as_raw_fd() >= 0, "Write file descriptor is invalid"); + } + + #[test] + fn test_pipe_write_read() { + let pipe = Pipe::new().expect("Failed to create pipe"); + let mut read_end = unsafe { std::fs::File::from_raw_fd(pipe.rd.as_raw_fd()) }; + let mut write_end = unsafe { std::fs::File::from_raw_fd(pipe.wr.as_raw_fd()) }; + + let write_data = b"hello"; + write_end + .write_all(write_data) + .expect("Failed to write to pipe"); + + let mut read_data = vec![0; write_data.len()]; + read_end + .read_exact(&mut read_data) + .expect("Failed to read from pipe"); + + assert_eq!( + &read_data, write_data, + "Data read from pipe does not match data written" + ); + } + + #[test] + fn test_pipe_async_write_read() { + let pipe = Pipe::new().expect("Failed to create pipe"); + let mut read_end = unsafe { std::fs::File::from_raw_fd(pipe.rd.as_raw_fd()) }; + let mut write_end = unsafe { std::fs::File::from_raw_fd(pipe.wr.as_raw_fd()) }; + + let rt = Runtime::new().expect("Failed to create runtime"); + + rt.block_on(async { + let write_data = b"hello"; + tokio::spawn(async move { + write_end + .write_all(write_data) + .expect("Failed to write to pipe"); + }); + + let mut read_data = vec![0; write_data.len()]; + read_end + .read_exact(&mut read_data) + .expect("Failed to read from pipe"); + + assert_eq!( + &read_data, write_data, + "Data read from pipe does not match data written" + ); + }); + } +} diff --git a/crates/runc/src/lib.rs b/crates/runc/src/lib.rs index 53c18f04..fa5a7b6e 100644 --- a/crates/runc/src/lib.rs +++ b/crates/runc/src/lib.rs @@ -61,11 +61,12 @@ pub mod asynchronous; pub mod container; pub mod error; pub mod events; -pub mod io; +#[cfg(not(feature = "async"))] +pub mod synchronous; + #[cfg(feature = "async")] pub mod monitor; pub mod options; -pub mod synchronous; pub mod utils; pub type Result = std::result::Result; @@ -434,9 +435,9 @@ impl Runc { let mut cmd = self.command(&args)?; match opts { Some(CreateOpts { io: Some(io), .. }) => { - io.set(&mut cmd).map_err(Error::UnavailableIO)?; + io.set(&mut cmd).await.map_err(Error::UnavailableIO)?; let res = self.launch(cmd, true).await?; - io.close_after_start(); + io.close_after_start().await; Ok(res) } _ => self.launch(cmd, true).await, @@ -471,11 +472,13 @@ impl Runc { match opts { Some(ExecOpts { io: Some(io), .. }) => { tc!( - io.set(&mut cmd).map_err(|e| Error::IoSet(e.to_string())), + io.set(&mut cmd) + .await + .map_err(|e| Error::IoSet(e.to_string())), &f ); tc!(self.launch(cmd, true).await, &f); - io.close_after_start(); + io.close_after_start().await; } _ => { tc!(self.launch(cmd, true).await, &f); @@ -567,7 +570,9 @@ impl Runc { args.push(id.to_string()); let mut cmd = self.command(&args)?; if let Some(CreateOpts { io: Some(io), .. }) = opts { - io.set(&mut cmd).map_err(|e| Error::IoSet(e.to_string()))?; + io.set(&mut cmd) + .await + .map_err(|e| Error::IoSet(e.to_string()))?; }; let _ = self.launch(cmd, true).await?; Ok(()) @@ -655,7 +660,7 @@ impl Spawner for DefaultExecutor { mod tests { use std::sync::Arc; - use super::{ + use crate::{ io::{InheritedStdIo, PipedStdIo}, *, }; diff --git a/crates/runc/src/io.rs b/crates/runc/src/synchronous/io.rs similarity index 99% rename from crates/runc/src/io.rs rename to crates/runc/src/synchronous/io.rs index cb787c7e..2d3132cd 100644 --- a/crates/runc/src/io.rs +++ b/crates/runc/src/synchronous/io.rs @@ -25,7 +25,7 @@ use std::{ use nix::unistd::{Gid, Uid}; -pub use crate::Io; +use super::Io; use crate::{Command, Pipe, PipedIo}; #[derive(Debug, Clone)] @@ -209,7 +209,6 @@ mod tests { } #[cfg(target_os = "linux")] - #[cfg(not(feature = "async"))] #[test] fn test_create_piped_io() { use std::io::{Read, Write}; diff --git a/crates/runc/src/synchronous/mod.rs b/crates/runc/src/synchronous/mod.rs index 7ee8d35e..039b0d30 100644 --- a/crates/runc/src/synchronous/mod.rs +++ b/crates/runc/src/synchronous/mod.rs @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ - +pub mod io; mod pipe; use std::{ fmt::Debug,