Skip to content

Commit

Permalink
OpenOptions in blocking mod is a time-consuming operation.
Browse files Browse the repository at this point in the history
1.Prevent it from blocking a tokio thread. Change sync to async.
2.Add pipe unit test which I found error in Pipe new.

Signed-off-by: jokemanfire <[email protected]>
  • Loading branch information
jokemanfire authored and mxpv committed Jan 9, 2025
1 parent 41d2ded commit a345bac
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 39 deletions.
219 changes: 219 additions & 0 deletions crates/runc/src/asynchronous/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use std::{fmt::Debug, io::Result, os::unix::io::AsRawFd, process::Stdio};

use async_trait::async_trait;
use nix::unistd::{Gid, Uid};
use tokio::fs::OpenOptions;

pub use crate::Io;
use crate::{Command, Pipe, PipedIo};

#[derive(Debug, Clone)]
pub struct IOOption {
pub open_stdin: bool,
pub open_stdout: bool,
pub open_stderr: bool,
}

impl Default for IOOption {
fn default() -> Self {
Self {
open_stdin: true,
open_stdout: true,
open_stderr: true,
}
}
}

impl PipedIo {
pub fn new(uid: u32, gid: u32, opts: &IOOption) -> std::io::Result<Self> {
Ok(Self {
stdin: if opts.open_stdin {
Self::create_pipe(uid, gid, true)?
} else {
None
},
stdout: if opts.open_stdout {
Self::create_pipe(uid, gid, true)?
} else {
None
},
stderr: if opts.open_stderr {
Self::create_pipe(uid, gid, true)?
} else {
None
},
})
}

fn create_pipe(uid: u32, gid: u32, stdin: bool) -> std::io::Result<Option<Pipe>> {
let pipe = Pipe::new()?;
let uid = Some(Uid::from_raw(uid));
let gid = Some(Gid::from_raw(gid));
if stdin {
let rd = pipe.rd.try_clone()?;
nix::unistd::fchown(rd.as_raw_fd(), uid, gid)?;
} else {
let wr = pipe.wr.try_clone()?;
nix::unistd::fchown(wr.as_raw_fd(), uid, gid)?;
}
Ok(Some(pipe))
}
}

/// IO driver to direct output/error messages to /dev/null.
///
/// With this Io driver, all methods of [crate::Runc] can't capture the output/error messages.
#[derive(Debug)]
pub struct NullIo {
dev_null: std::sync::Mutex<Option<std::fs::File>>,
}

impl NullIo {
pub fn new() -> std::io::Result<Self> {
let f = std::fs::OpenOptions::new().read(true).open("/dev/null")?;
let dev_null = std::sync::Mutex::new(Some(f));
Ok(Self { dev_null })
}
}

#[async_trait]
impl Io for NullIo {
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
if let Some(null) = self.dev_null.lock().unwrap().as_ref() {
cmd.stdout(null.try_clone()?);
cmd.stderr(null.try_clone()?);
}
Ok(())
}

async fn close_after_start(&self) {
let mut m = self.dev_null.lock().unwrap();
let _ = m.take();
}
}

/// Io driver based on Stdio::inherited(), to direct outputs/errors to stdio.
///
/// With this Io driver, all methods of [crate::Runc] can't capture the output/error messages.
#[derive(Debug)]
pub struct InheritedStdIo {}

impl InheritedStdIo {
pub fn new() -> std::io::Result<Self> {
Ok(InheritedStdIo {})
}
}

#[async_trait]
impl Io for InheritedStdIo {
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
cmd.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit());
Ok(())
}

async fn close_after_start(&self) {}
}

/// Io driver based on Stdio::piped(), to capture outputs/errors from runC.
///
/// With this Io driver, methods of [crate::Runc] may capture the output/error messages.
#[derive(Debug)]
pub struct PipedStdIo {}

impl PipedStdIo {
pub fn new() -> std::io::Result<Self> {
Ok(PipedStdIo {})
}
}
#[async_trait]
impl Io for PipedStdIo {
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
cmd.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
Ok(())
}

async fn close_after_start(&self) {}
}

/// FIFO for the scenario that set FIFO for command Io.
#[derive(Debug)]
pub struct FIFO {
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
}
#[async_trait]
impl Io for FIFO {
async fn set(&self, cmd: &mut Command) -> Result<()> {
if let Some(path) = self.stdin.as_ref() {
let stdin = OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.await?;
cmd.stdin(stdin.into_std().await);
}

if let Some(path) = self.stdout.as_ref() {
let stdout = OpenOptions::new().write(true).open(path).await?;
cmd.stdout(stdout.into_std().await);
}

if let Some(path) = self.stderr.as_ref() {
let stderr = OpenOptions::new().write(true).open(path).await?;
cmd.stderr(stderr.into_std().await);
}

Ok(())
}

async fn close_after_start(&self) {}
}

#[cfg(test)]
mod tests {
use super::*;

#[cfg(not(target_os = "macos"))]
#[test]
fn test_io_option() {
let opts = IOOption {
open_stdin: false,
open_stdout: false,
open_stderr: false,
};
let io = PipedIo::new(1000, 1000, &opts).unwrap();

assert!(io.stdin().is_none());
assert!(io.stdout().is_none());
assert!(io.stderr().is_none());
}

#[tokio::test]
async fn test_null_io() {
let io = NullIo::new().unwrap();
assert!(io.stdin().is_none());
assert!(io.stdout().is_none());
assert!(io.stderr().is_none());
}
}
21 changes: 8 additions & 13 deletions crates/runc/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,36 @@
See the License for the specific language governing permissions and
limitations under the License.
*/

pub mod io;
mod pipe;
use std::{fmt::Debug, io::Result, os::fd::AsRawFd};

use async_trait::async_trait;
use log::debug;
pub use pipe::Pipe;
use tokio::io::{AsyncRead, AsyncWrite};

use crate::Command;

#[async_trait]
pub trait Io: Debug + Send + Sync {
/// Return write side of stdin
#[cfg(feature = "async")]
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
None
}

/// Return read side of stdout
#[cfg(feature = "async")]
fn stdout(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
None
}

/// Return read side of stderr
#[cfg(feature = "async")]
fn stderr(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
None
}

/// Set IO for passed command.
/// Read side of stdin, write side of stdout and write side of stderr should be provided to command.
fn set(&self, cmd: &mut Command) -> Result<()>;
async fn set(&self, cmd: &mut Command) -> Result<()>;

/// Only close write side (should be stdout/err "from" runc process)
fn close_after_start(&self);
async fn close_after_start(&self);
}

#[derive(Debug)]
Expand All @@ -56,7 +51,7 @@ pub struct PipedIo {
pub stdout: Option<Pipe>,
pub stderr: Option<Pipe>,
}

#[async_trait]
impl Io for PipedIo {
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
self.stdin.as_ref().and_then(|pipe| {
Expand Down Expand Up @@ -87,7 +82,7 @@ impl Io for PipedIo {

// Note that this internally use [`std::fs::File`]'s `try_clone()`.
// Thus, the files passed to commands will be not closed after command exit.
fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
if let Some(p) = self.stdin.as_ref() {
let pr = p.rd.try_clone()?;
cmd.stdin(pr);
Expand All @@ -106,7 +101,7 @@ impl Io for PipedIo {
Ok(())
}

fn close_after_start(&self) {
async fn close_after_start(&self) {
if let Some(p) = self.stdout.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e));
}
Expand Down
76 changes: 74 additions & 2 deletions crates/runc/src/asynchronous/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,80 @@ pub struct Pipe {
impl Pipe {
pub fn new() -> std::io::Result<Self> {
let (tx, rx) = pipe::pipe()?;
let rd = tx.into_blocking_fd()?;
let wr = rx.into_blocking_fd()?;
let rd = rx.into_blocking_fd()?;
let wr = tx.into_blocking_fd()?;
Ok(Self { rd, wr })
}
}

#[cfg(test)]
mod tests {
use std::os::fd::IntoRawFd;

use tokio::io::{AsyncReadExt, AsyncWriteExt};

use super::*;

#[tokio::test]
async fn test_pipe_creation() {
let pipe = Pipe::new().expect("Failed to create pipe");
assert!(
pipe.rd.into_raw_fd() >= 0,
"Read file descriptor is invalid"
);
assert!(
pipe.wr.into_raw_fd() >= 0,
"Write file descriptor is invalid"
);
}

#[tokio::test]
async fn test_pipe_write_read() {
let pipe = Pipe::new().expect("Failed to create pipe");
let mut read_end = pipe::Receiver::from_owned_fd(pipe.rd).unwrap();
let mut write_end = pipe::Sender::from_owned_fd(pipe.wr).unwrap();
let write_data = b"hello";

write_end
.write_all(write_data)
.await
.expect("Failed to write to pipe");

let mut read_data = vec![0; write_data.len()];
read_end
.read_exact(&mut read_data)
.await
.expect("Failed to read from pipe");

assert_eq!(
read_data, write_data,
"Data read from pipe does not match data written"
);
}

#[tokio::test]
async fn test_pipe_async_write_read() {
let pipe = Pipe::new().expect("Failed to create pipe");
let mut read_end = pipe::Receiver::from_owned_fd(pipe.rd).unwrap();
let mut write_end = pipe::Sender::from_owned_fd(pipe.wr).unwrap();

let write_data = b"hello";
tokio::spawn(async move {
write_end
.write_all(write_data)
.await
.expect("Failed to write to pipe");
});

let mut read_data = vec![0; write_data.len()];
read_end
.read_exact(&mut read_data)
.await
.expect("Failed to read from pipe");

assert_eq!(
&read_data, write_data,
"Data read from pipe does not match data written"
);
}
}
Loading

0 comments on commit a345bac

Please sign in to comment.