Skip to content

Commit

Permalink
Merge branch 'main' into lockdev
Browse files Browse the repository at this point in the history
Signed-off-by: jokemanfire <[email protected]>
  • Loading branch information
jokemanfire authored Dec 14, 2024
2 parents 6121223 + a0f3bf1 commit 73ac75e
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 56 deletions.
18 changes: 18 additions & 0 deletions crates/runc-shim/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::{
env,
fs::File,
future::Future,
io::IoSliceMut,
ops::Deref,
os::{
Expand All @@ -25,6 +26,7 @@ use std::{
},
path::Path,
sync::Arc,
time::Duration,
};

use containerd_shim::{
Expand Down Expand Up @@ -59,6 +61,8 @@ pub const INIT_PID_FILE: &str = "init.pid";
pub const LOG_JSON_FILE: &str = "log.json";
pub const FIFO_SCHEME: &str = "fifo";

const TIMEOUT_DURATION: std::time::Duration = Duration::from_secs(3);

#[derive(Deserialize)]
pub struct Log {
pub level: String,
Expand Down Expand Up @@ -248,3 +252,17 @@ pub(crate) fn xdg_runtime_dir() -> String {
env::var("XDG_RUNTIME_DIR")
.unwrap_or_else(|_| env::temp_dir().to_str().unwrap_or(".").to_string())
}

pub async fn handle_file_open<F, Fut>(file_op: F) -> Result<tokio::fs::File, tokio::io::Error>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<tokio::fs::File, tokio::io::Error>> + Send,
{
match tokio::time::timeout(TIMEOUT_DURATION, file_op()).await {
Ok(result) => result,
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"File operation timed out",
)),
}
}
6 changes: 6 additions & 0 deletions crates/runc-shim/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ where
async fn state(&self, exec_id: Option<&str>) -> Result<StateResponse> {
let process = self.get_process(exec_id)?;
let mut resp = process.state().await?;
let init_state = self.init.state().await?.status;
if init_state == EnumOrUnknown::new(Status::PAUSING)
|| init_state == EnumOrUnknown::new(Status::PAUSED)
{
resp.status = init_state;
}
resp.bundle = self.bundle.to_string();
debug!("container state: {:?}", resp);
Ok(resp)
Expand Down
5 changes: 5 additions & 0 deletions crates/runc-shim/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub trait Process {
async fn close_io(&mut self) -> Result<()>;
async fn pause(&mut self) -> Result<()>;
async fn resume(&mut self) -> Result<()>;
async fn id(&self) -> &str;
}

#[async_trait]
Expand Down Expand Up @@ -123,6 +124,10 @@ where
self.pid
}

async fn id(&self) -> &str {
self.id.as_str()
}

async fn state(&self) -> Result<StateResponse> {
let mut resp = StateResponse::new();
resp.id = self.id.to_string();
Expand Down
82 changes: 50 additions & 32 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use super::{
};
use crate::{
common::{
check_kill_error, create_io, create_runc, get_spec_from_request, receive_socket,
CreateConfig, Log, ProcessIO, ShimExecutor, INIT_PID_FILE, LOG_JSON_FILE,
check_kill_error, create_io, create_runc, get_spec_from_request, handle_file_open,
receive_socket, CreateConfig, Log, ProcessIO, ShimExecutor, INIT_PID_FILE, LOG_JSON_FILE,
},
io::Stdio,
};
Expand Down Expand Up @@ -538,11 +538,14 @@ async fn copy_console(
.try_clone()
.await
.map_err(io_error!(e, "failed to clone console file"))?;
let stdin = OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
.map_err(io_error!(e, "failed to open stdin"))?;
let stdin = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
})
.await
.map_err(io_error!(e, "failed to open stdin"))?;
spawn_copy(stdin, console_stdin, exit_signal.clone(), None::<fn()>);
}

Expand Down Expand Up @@ -587,30 +590,39 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
if let Some(w) = io.stdin() {
debug!("copy_io: pipe stdin from {}", stdio.stdin.as_str());
if !stdio.stdin.is_empty() {
let stdin = OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
.map_err(io_error!(e, "open stdin"))?;
let stdin = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
})
.await
.map_err(io_error!(e, "open stdin"))?;
spawn_copy(stdin, w, exit_signal.clone(), None::<fn()>);
}
}

if let Some(r) = io.stdout() {
debug!("copy_io: pipe stdout from to {}", stdio.stdout.as_str());
if !stdio.stdout.is_empty() {
let stdout = OpenOptions::new()
.write(true)
.open(stdio.stdout.as_str())
.await
.map_err(io_error!(e, "open stdout"))?;
let stdout = handle_file_open(|| async {
OpenOptions::new()
.write(true)
.open(stdio.stdout.as_str())
.await
})
.await
.map_err(io_error!(e, "open stdout"))?;
// open a read to make sure even if the read end of containerd shutdown,
// copy still continue until the restart of containerd succeed
let stdout_r = OpenOptions::new()
.read(true)
.open(stdio.stdout.as_str())
.await
.map_err(io_error!(e, "open stdout for read"))?;
let stdout_r = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stdout.as_str())
.await
})
.await
.map_err(io_error!(e, "open stdout for read"))?;
spawn_copy(
r,
stdout,
Expand All @@ -625,18 +637,24 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
if let Some(r) = io.stderr() {
if !stdio.stderr.is_empty() {
debug!("copy_io: pipe stderr from to {}", stdio.stderr.as_str());
let stderr = OpenOptions::new()
.write(true)
.open(stdio.stderr.as_str())
.await
.map_err(io_error!(e, "open stderr"))?;
let stderr = handle_file_open(|| async {
OpenOptions::new()
.write(true)
.open(stdio.stderr.as_str())
.await
})
.await
.map_err(io_error!(e, "open stderr"))?;
// open a read to make sure even if the read end of containerd shutdown,
// copy still continue until the restart of containerd succeed
let stderr_r = OpenOptions::new()
.read(true)
.open(stdio.stderr.as_str())
.await
.map_err(io_error!(e, "open stderr for read"))?;
let stderr_r = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stderr.as_str())
.await
})
.await
.map_err(io_error!(e, "open stderr for read"))?;
spawn_copy(
r,
stderr,
Expand Down
54 changes: 30 additions & 24 deletions crates/runc-shim/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Shim for Service {
let namespace = self.namespace.as_str();
let bundle = current_dir().map_err(io_error!(e, "get current dir"))?;
let opts = read_options(&bundle).await?;
let runtime = read_runtime(&bundle).await?;
let runtime = read_runtime(&bundle).await.unwrap_or_default();

let runc = create_runc(
&runtime,
Expand All @@ -117,7 +117,9 @@ impl Shim for Service {
&opts,
Some(Arc::new(ShimExecutor::default())),
)?;
let pid = read_pid_from_file(&bundle.join(INIT_PID_FILE)).await?;
let pid = read_pid_from_file(&bundle.join(INIT_PID_FILE))
.await
.unwrap_or_default();

runc.delete(&self.id, Some(&DeleteOpts { force: true }))
.await
Expand Down Expand Up @@ -161,6 +163,8 @@ async fn process_exits(
let exit_code = e.exit_code;
for (_k, cont) in containers.write().await.iter_mut() {
let bundle = cont.bundle.to_string();
let container_id = cont.id.clone();
let mut change_process: Vec<&mut (dyn Process + Send + Sync)> = Vec::new();
// pid belongs to container init process
if cont.init.pid == pid {
// kill all children process if the container has a private PID namespace
Expand All @@ -169,20 +173,30 @@ async fn process_exits(
error!("failed to kill init's children: {}", e)
});
}
// set exit for init process
cont.init.set_exited(exit_code).await;

if let Ok(process_d) = cont.get_mut_process(None) {
change_process.push(process_d);
} else {
break;
}
} else {
// pid belongs to container common process
if let Some((_, p)) = cont.processes.iter_mut().find(|(_, p)| p.pid == pid)
{
change_process.push(p as &mut (dyn Process + Send + Sync));
}
}
let process_len = change_process.len();
for process in change_process {
// set exit for process
process.set_exited(exit_code).await;
let code = process.exit_code().await;
let exited_at = process.exited_at().await;
// publish event
let (_, code, exited_at) = match cont.get_exit_info(None).await {
Ok(info) => info,
Err(_) => break,
};

let ts = convert_to_timestamp(exited_at);
let event = TaskExit {
container_id: cont.id.to_string(),
id: cont.id.to_string(),
pid: cont.pid().await as u32,
container_id: container_id.clone(),
id: process.id().await.to_string(),
pid: process.pid().await as u32,
exit_status: code as u32,
exited_at: Some(ts).into(),
..Default::default()
Expand All @@ -191,18 +205,10 @@ async fn process_exits(
tx.send((topic.to_string(), Box::new(event)))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));

break;
}

// pid belongs to container common process
for (_exec_id, p) in cont.processes.iter_mut() {
// set exit for exec process
if p.pid == pid {
p.set_exited(exit_code).await;
// TODO: publish event
break;
}
//if process has been find , no need to keep search
if process_len != 0 {
break;
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/shim/src/asynchronous/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,13 @@ pub async fn read_spec(bundle: impl AsRef<Path>) -> Result<Spec> {
serde_json::from_str::<Spec>(content.as_str()).map_err(other_error!("read spec"))
}

// read_options reads the option information from the path.
// When the file does not exist, read_options returns nil without an error.
pub async fn read_options(bundle: impl AsRef<Path>) -> Result<Options> {
let path = bundle.as_ref().join(OPTIONS_FILE_NAME);
if !path.exists() {
return Ok(Options::default());
}
let opts_str = read_file_to_str(path).await?;
let opts =
serde_json::from_str::<JsonOptions>(&opts_str).map_err(other_error!("read options"))?;
Expand Down

0 comments on commit 73ac75e

Please sign in to comment.