Skip to content

Commit

Permalink
fix: address multiple issues with foreground controls for pipeline co…
Browse files Browse the repository at this point in the history
…mmands (#180)
  • Loading branch information
reubeno authored Sep 30, 2024
1 parent ee0b8e3 commit 2ddbf61
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 74 deletions.
38 changes: 14 additions & 24 deletions brush-core/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@ pub(crate) enum CommandSpawnResult {

impl CommandSpawnResult {
// TODO: jobs: remove `no_wait`; it doesn't make any sense
// TODO: jobs: figure out how to remove 'shell'
#[allow(clippy::too_many_lines)]
pub async fn wait(
self,
shell: &mut Shell,
no_wait: bool,
) -> Result<CommandWaitResult, error::Error> {
pub async fn wait(self, no_wait: bool) -> Result<CommandWaitResult, error::Error> {
#[allow(clippy::ignored_unit_patterns)]
match self {
CommandSpawnResult::SpawnedProcess(mut child) => {
Expand All @@ -61,10 +56,6 @@ impl CommandSpawnResult {
),
};

if shell.options.interactive {
sys::terminal::move_self_to_foreground()?;
}

Ok(command_wait_result)
}
CommandSpawnResult::ImmediateExit(exit_code) => Ok(
Expand Down Expand Up @@ -354,16 +345,15 @@ pub(crate) fn execute_external_command(
)?;

// Set up process group state.
let required_pgid = if new_pg {
let required_pgid = process_group_id.unwrap_or(0);

if new_pg {
// We need to set up a new process group.
#[cfg(unix)]
cmd.process_group(required_pgid);

required_pgid
} else {
0
};
cmd.process_group(0);
} else if let Some(pgid) = process_group_id {
// We need to join an established process group.
#[cfg(unix)]
cmd.process_group(*pgid);
}

// Register some code to run in the forked child process before it execs
// the target command.
Expand All @@ -377,7 +367,7 @@ pub(crate) fn execute_external_command(
// When tracing is enabled, report.
tracing::debug!(
target: trace_categories::COMMANDS,
"Spawning: pgid={required_pgid} cmd='{} {}'",
"Spawning: cmd='{} {}'",
cmd.get_program().to_string_lossy().to_string(),
cmd.get_args()
.map(|a| a.to_string_lossy().to_string())
Expand All @@ -387,11 +377,11 @@ pub(crate) fn execute_external_command(
match sys::process::spawn(cmd) {
Ok(child) => {
// Retrieve the pid.
let pid = child.id();
#[allow(clippy::cast_possible_wrap)]
let pid = child.id().map(|id| id as i32);
if let Some(pid) = &pid {
#[allow(clippy::cast_possible_wrap)]
if required_pgid == 0 {
*process_group_id = Some(*pid as i32);
if new_pg {
*process_group_id = Some(*pid);
}
} else {
tracing::warn!("could not retrieve pid for child process");
Expand Down
15 changes: 13 additions & 2 deletions brush-core/src/interp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub struct ExecutionParameters {
pub process_group_policy: ProcessGroupPolicy,
}

#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
/// Policy for how to manage spawned external processes.
pub enum ProcessGroupPolicy {
/// Place the process in a new process group.
Expand Down Expand Up @@ -324,6 +324,12 @@ async fn spawn_pipeline_processes(
process_group_id,
params: params.clone(),
};

// Make sure that all commands in the pipeline are in the same process group.
if current_pipeline_index > 0 {
pipeline_context.params.process_group_policy = ProcessGroupPolicy::SameProcessGroup;
}

spawn_results.push_back(command.execute_in_pipeline(&mut pipeline_context).await?);
process_group_id = pipeline_context.process_group_id;
} else {
Expand All @@ -335,6 +341,7 @@ async fn spawn_pipeline_processes(
process_group_id,
params: params.clone(),
};

spawn_results.push_back(command.execute_in_pipeline(&mut pipeline_context).await?);
process_group_id = pipeline_context.process_group_id;
}
Expand All @@ -352,7 +359,7 @@ async fn wait_for_pipeline_processes(
let mut stopped_children = vec![];

while let Some(child) = process_spawn_results.pop_front() {
match child.wait(shell, !stopped_children.is_empty()).await? {
match child.wait(!stopped_children.is_empty()).await? {
commands::CommandWaitResult::CommandCompleted(current_result) => {
result = current_result;
shell.last_exit_status = result.exit_code;
Expand All @@ -366,6 +373,10 @@ async fn wait_for_pipeline_processes(
}
}

if shell.options.interactive {
sys::terminal::move_self_to_foreground()?;
}

// If there were stopped jobs, then encapsulate the pipeline as a managed job and hand it
// off to the job manager.
if !stopped_children.is_empty() {
Expand Down
6 changes: 3 additions & 3 deletions brush-core/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ pub struct Job {
tasks: VecDeque<JobTask>,

/// If available, the process group ID of the job's processes.
pgid: Option<u32>,
pgid: Option<sys::process::ProcessId>,

/// The annotation of the job (e.g., current, previous).
annotation: JobAnnotation,
Expand Down Expand Up @@ -417,7 +417,7 @@ impl Job {
}

/// Tries to retrieve a "representative" pid for the job.
pub fn get_representative_pid(&self) -> Option<u32> {
pub fn get_representative_pid(&self) -> Option<sys::process::ProcessId> {
for task in &self.tasks {
match task {
JobTask::External(p) => {
Expand All @@ -431,7 +431,7 @@ impl Job {
None
}

pub fn get_process_group_id(&self) -> Option<u32> {
pub fn get_process_group_id(&self) -> Option<sys::process::ProcessId> {
// TODO: Don't assume that the first PID is the PGID.
self.pgid.or_else(|| self.get_representative_pid())
}
Expand Down
6 changes: 3 additions & 3 deletions brush-core/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ pub(crate) type WaitableChildProcess = std::pin::Pin<
/// Tracks a child process being awaited.
pub(crate) struct ChildProcess {
/// If available, the process ID of the child.
pid: Option<u32>,
pid: Option<sys::process::ProcessId>,
/// A waitable future that will yield the results of a child process's execution.
exec_future: WaitableChildProcess,
}

impl ChildProcess {
/// Wraps a child process and its future.
pub fn new(pid: Option<u32>, child: sys::process::Child) -> Self {
pub fn new(pid: Option<sys::process::ProcessId>, child: sys::process::Child) -> Self {
Self {
pid,
exec_future: Box::pin(child.wait_with_output()),
}
}

pub fn pid(&self) -> Option<u32> {
pub fn pid(&self) -> Option<sys::process::ProcessId> {
self.pid
}

Expand Down
2 changes: 2 additions & 0 deletions brush-core/src/sys/stubs/process.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub(crate) type ProcessId = i32;

pub(crate) struct Child {
inner: std::process::Child,
}
Expand Down
6 changes: 3 additions & 3 deletions brush-core/src/sys/stubs/signal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{error, traps};
use crate::{error, sys, traps};

pub(crate) fn parse_numeric_signal(_signal: i32) -> Result<traps::TrapSignal, error::Error> {
Err(error::Error::InvalidSignal)
Expand All @@ -8,11 +8,11 @@ pub(crate) fn parse_os_signal_name(_signal: &str) -> Result<traps::TrapSignal, e
Err(error::Error::InvalidSignal)
}

pub(crate) fn continue_process(_pid: u32) -> Result<(), error::Error> {
pub(crate) fn continue_process(_pid: sys::process::ProcessId) -> Result<(), error::Error> {
error::unimp("continue process")
}

pub(crate) fn kill_process(_pid: u32) -> Result<(), error::Error> {
pub(crate) fn kill_process(_pid: sys::process::ProcessId) -> Result<(), error::Error> {
error::unimp("kill process")
}

Expand Down
10 changes: 5 additions & 5 deletions brush-core/src/sys/stubs/terminal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error;
use crate::{error, sys};

#[derive(Clone)]
pub(crate) struct TerminalSettings {}
Expand Down Expand Up @@ -32,19 +32,19 @@ pub(crate) fn is_stdin_a_terminal() -> Result<bool, error::Error> {
Ok(false)
}

pub(crate) fn get_parent_process_id() -> Option<u32> {
pub(crate) fn get_parent_process_id() -> Option<sys::process::ProcessId> {
None
}

pub(crate) fn get_process_group_id() -> Option<u32> {
pub(crate) fn get_process_group_id() -> Option<sys::process::ProcessId> {
None
}

pub(crate) fn get_foreground_pid() -> Option<u32> {
pub(crate) fn get_foreground_pid() -> Option<sys::process::ProcessId> {
None
}

pub(crate) fn move_to_foreground(_pid: u32) -> Result<(), error::Error> {
pub(crate) fn move_to_foreground(_pid: sys::process::ProcessId) -> Result<(), error::Error> {
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions brush-core/src/sys/tokio_process.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) type ProcessId = i32;
pub(crate) use tokio::process::Child;

pub(crate) fn spawn(command: std::process::Command) -> std::io::Result<Child> {
Expand Down
21 changes: 7 additions & 14 deletions brush-core/src/sys/unix/signal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::str::FromStr;

use crate::{error, traps};
use crate::{error, sys, traps};

pub(crate) fn parse_numeric_signal(signal: i32) -> Result<traps::TrapSignal, error::Error> {
Ok(traps::TrapSignal::Signal(
Expand All @@ -14,23 +14,16 @@ pub(crate) fn parse_os_signal_name(signal: &str) -> Result<traps::TrapSignal, er
))
}

pub(crate) fn continue_process(pid: u32) -> Result<(), error::Error> {
pub(crate) fn continue_process(pid: sys::process::ProcessId) -> Result<(), error::Error> {
#[allow(clippy::cast_possible_wrap)]
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid as i32),
nix::sys::signal::SIGCONT,
)
.map_err(|_errno| error::Error::FailedToSendSignal)?;
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), nix::sys::signal::SIGCONT)
.map_err(|_errno| error::Error::FailedToSendSignal)?;
Ok(())
}

pub(crate) fn kill_process(pid: u32) -> Result<(), error::Error> {
#[allow(clippy::cast_possible_wrap)]
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid as i32),
nix::sys::signal::SIGKILL,
)
.map_err(|_errno| error::Error::FailedToSendSignal)?;
pub(crate) fn kill_process(pid: sys::process::ProcessId) -> Result<(), error::Error> {
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), nix::sys::signal::SIGKILL)
.map_err(|_errno| error::Error::FailedToSendSignal)?;

Ok(())
}
Expand Down
26 changes: 9 additions & 17 deletions brush-core/src/sys/unix/terminal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error;
use crate::{error, sys};
use std::os::fd::{AsFd, AsRawFd};

#[derive(Clone)]
Expand Down Expand Up @@ -48,31 +48,23 @@ pub(crate) fn is_stdin_a_terminal() -> Result<bool, error::Error> {
}

#[allow(clippy::unnecessary_wraps)]
pub(crate) fn get_parent_process_id() -> Option<u32> {
#[allow(clippy::cast_sign_loss)]
{
Some(nix::unistd::getppid().as_raw() as u32)
}
pub(crate) fn get_parent_process_id() -> Option<sys::process::ProcessId> {
Some(nix::unistd::getppid().as_raw())
}

#[allow(clippy::unnecessary_wraps)]
pub(crate) fn get_process_group_id() -> Option<u32> {
#[allow(clippy::cast_sign_loss)]
{
Some(nix::unistd::getpgrp().as_raw() as u32)
}
pub(crate) fn get_process_group_id() -> Option<sys::process::ProcessId> {
Some(nix::unistd::getpgrp().as_raw())
}

pub(crate) fn get_foreground_pid() -> Option<u32> {
#[allow(clippy::cast_sign_loss)]
pub(crate) fn get_foreground_pid() -> Option<sys::process::ProcessId> {
nix::unistd::tcgetpgrp(std::io::stdin())
.ok()
.map(|pgid| pgid.as_raw() as u32)
.map(|pgid| pgid.as_raw())
}

pub(crate) fn move_to_foreground(pid: u32) -> Result<(), error::Error> {
#[allow(clippy::cast_possible_wrap)]
nix::unistd::tcsetpgrp(std::io::stdin(), nix::unistd::Pid::from_raw(pid as i32))?;
pub(crate) fn move_to_foreground(pid: sys::process::ProcessId) -> Result<(), error::Error> {
nix::unistd::tcsetpgrp(std::io::stdin(), nix::unistd::Pid::from_raw(pid))?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion brush-core/src/terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{error, sys};
/// Encapsulates the state of a controlled terminal.
#[allow(clippy::module_name_repetitions)]
pub struct TerminalControl {
prev_fg_pid: Option<u32>,
prev_fg_pid: Option<sys::process::ProcessId>,
}

impl TerminalControl {
Expand Down
3 changes: 1 addition & 2 deletions brush-shell/tests/interactive_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ fn run_in_bg_then_fg() -> anyhow::Result<()> {
Ok(())
}

#[ignore] // TODO: Fix this test!
#[test]
fn run_pipeline_interactively() -> anyhow::Result<()> {
let mut session = start_shell_session()?;
Expand All @@ -99,7 +98,7 @@ fn run_pipeline_interactively() -> anyhow::Result<()> {
.context("Echoed text didn't show up")?;
session.send("h")?;
session
.expect("SUMMARY OF LESS COMMANDS")
.expect("SUMMARY")
.context("less help didn't show up")?;
session.send("q")?;
session.send("q")?;
Expand Down

0 comments on commit 2ddbf61

Please sign in to comment.