From 4ad535f9593b60618340f172547aea7d3f47962e Mon Sep 17 00:00:00 2001 From: Philip Metzger Date: Fri, 15 Dec 2023 19:14:18 +0100 Subject: [PATCH] WIP: run: Flesh out a bare implementation of `jj run`. This is basically a MVP based on `fix`, caching is not implemented yet. The core functionality is in `run_inner()` and `rewrite_commit()`. TODO: rewrite trees and expose all files --- Cargo.lock | 1 + cli/Cargo.toml | 2 + cli/src/commands/run.rs | 325 +++++++++++++++++++++++++- lib/src/default_working_copy_store.rs | 34 +-- lib/src/working_copy_store.rs | 41 ++-- 5 files changed, 360 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0677eb222e..ae859c0a82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1711,6 +1711,7 @@ dependencies = [ "textwrap", "thiserror", "timeago", + "tokio", "toml_edit", "tracing", "tracing-chrome", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index fd1085b8b9..ea60299661 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -86,6 +86,8 @@ tempfile = { workspace = true } textwrap = { workspace = true } thiserror = { workspace = true } timeago = { workspace = true } +# TODO: Evaluate moving the Workqueue to another Crate, ideally `jj-util`. +tokio = { workspace = true } toml_edit = { workspace = true } tracing = { workspace = true } tracing-chrome = { workspace = true } diff --git a/cli/src/commands/run.rs b/cli/src/commands/run.rs index 4dc1c4417c..3c3fcd1f29 100644 --- a/cli/src/commands/run.rs +++ b/cli/src/commands/run.rs @@ -14,12 +14,254 @@ //! This file contains the internal implementation of `run`. -use itertools::Itertools as _; +use std::collections::HashSet; +use std::fs::File; +use std::io; +use std::ops::Deref; +use std::path::PathBuf; +use std::process::ExitStatus; +use std::sync::mpsc::Sender; -use crate::cli_util::{CommandHelper, RevisionArg}; +use clap::Command; +use futures::StreamExt; +use itertools::Itertools; +use jj_lib::backend::{CommitId, MergedTreeId, TreeValue}; +use jj_lib::commit::Commit; +use jj_lib::dag_walk::topo_order_forward_ok; +use jj_lib::matchers::EverythingMatcher; +use jj_lib::merged_tree::MergedTreeBuilder; +use jj_lib::object_id::ObjectId; +use jj_lib::repo::Repo; +use jj_lib::tree::Tree; +use pollster::FutureExt; +use tokio::runtime::Builder; +use tokio::task::JoinSet; +use tokio::{process, runtime, sync}; + +use crate::cli_util::{CommandHelper, RevisionArg, WorkspaceCommandTransaction}; use crate::command_error::{user_error, CommandError}; use crate::ui::Ui; +#[derive(Debug, thiserror::Error)] +enum RunError { + #[error("Couldn't create directory")] + NoDirectoryCreated, +} + +impl From for CommandError { + fn from(value: RunError) -> Self { + CommandError::new(crate::command_error::CommandErrorKind::Cli, Box::new(value)) + } +} + +/// Creates the required directories for a StoredWorkingCopy. +/// Returns a tuple of (`output_dir`, `working_copy` and `state`). +fn create_working_copy_paths( + path: &PathBuf, +) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> { + let output = path.join("output"); + let working_copy = path.join("working_copy"); + let state = path.join("state"); + std::fs::create_dir(&output)?; + std::fs::create_dir(&working_copy)?; + std::fs::create_dir(&state)?; + Ok((output, working_copy, state)) +} + +/// Represent a `MergeTreeId` in a way that it may be used as a working-copy +/// name. This makes no stability guarantee, as the format may change at +/// any time. +fn to_wc_name(id: &MergedTreeId) -> String { + match id { + MergedTreeId::Legacy(tree_id) => tree_id.hex(), + MergedTreeId::Merge(tree_ids) => { + let ids = tree_ids + .map(|id| id.hex()) + .iter_mut() + .enumerate() + .map(|(i, s)| { + // Incredibly "smart" way to say, append "-" if the number is odd "+" + // otherwise. + if i & 1 != 0 { + s.push('-'); + } else { + s.push('+'); + } + s.to_owned() + }) + .collect_vec(); + let mut obfuscated: String = ids.concat(); + // `PATH_MAX` could be a problem for different operating systems, so truncate + // it. + if obfuscated.len() >= 255 { + obfuscated.truncate(200); + } + obfuscated + } + } +} + +fn get_runtime(jobs: usize) -> tokio::runtime::Handle { + let mut builder = Builder::new_multi_thread(); + if cfg!(watchman) { + // Watchman requires a multithreaded runtime, so just reuse it. + return runtime::Handle::current(); + } + if jobs == 1 { + builder.max_blocking_threads(1); + } else { + builder.max_blocking_threads(jobs); + } + let rt = builder.build().unwrap(); + rt.handle().clone() +} + +/// A commit stored under `.jj/run/` +// TODO: Create a caching backend, which creates these on a dedicated thread or +// threadpool. +struct StoredCommit { + /// Obfuscated name for an easier lookup. If a tree/directory its not set + name: Option, + /// The respective commit unmodified. + commit: Commit, + output_dir: PathBuf, + working_copy_dir: PathBuf, + state_dir: PathBuf, + /// The `stdout` of the commit + stdout: File, + /// The `stderr` of the commit + stderr: File, +} + +impl StoredCommit { + fn new( + name: Option, + commit: &Commit, + output_dir: PathBuf, + working_copy_dir: PathBuf, + state_dir: PathBuf, + stdout: File, + stderr: File, + ) -> Self { + Self { + name, + commit: commit.clone(), + output_dir, + working_copy_dir, + state_dir, + stdout, + stderr, + } + } +} + +const BASE_PATH: &str = ".jj/run/default"; + +fn create_output_files(path: &PathBuf) -> Result<(File, File), io::Error> { + let _path = path; + Err(io::Error::last_os_error()) +} + +fn create_working_copies(commits: &[Commit]) -> Result, io::Error> { + let mut results = vec![]; + for commit in commits { + let name = to_wc_name(commit.tree_id()); + let base_path = PathBuf::new(); + let (output_dir, working_copy_dir, state_dir) = create_working_copy_paths(&base_path)?; + let (stdout, stderr) = create_output_files(&base_path)?; + + let stored_commit = StoredCommit::new( + Some(name), + commit, + output_dir, + working_copy_dir, + state_dir, + stdout, + stderr, + ); + results.push(stored_commit); + } + Ok(results) +} +/// The result of a single command invocation in `run_inner`. +enum RunJobResult { + /// A `Tree` and it's rewritten `CommitId` + Success { + /// The old `CommitId` of the commit. + old_id: CommitId, + /// The new `CommitId` for the commit. + rewritten_id: CommitId, + /// The new tree generated from the commit. + new_tree: Tree, + }, + /// The commands exit code + // TODO: use an actual error here. + Failure(ExitStatus), +} + +// TODO: make this more revset stream friendly. +async fn run_inner<'a>( + tx: WorkspaceCommandTransaction<'a>, + sender: Sender, + jobs: usize, + shell_command: &str, + commits: &[StoredCommit], +) -> Result<(), RunError> { + let mut command_futures = JoinSet::new(); + for commit in commits { + command_futures.spawn(rewrite_commit(tx, commit.deref(), shell_command)); + } + + while let Some(res) = command_futures.join_next().await { + let done = res?; + sender.send(done?); + } + Ok(()) +} + +/// Rewrite a single `StoredCommit`. +async fn rewrite_commit<'a>( + tx: WorkspaceCommandTransaction<'a>, + stored_commit: &StoredCommit, + shell_command: &str, +) -> Result { + let mut command_builder = tokio::process::Command::new("sh") + .args([shell_command]) + // TODO: relativize + // .env("JJ_PATH", stored_commit.tree_path) + .stdout(stored_commit.stdout) + .stderr(stored_commit.stderr); + let status = command_builder.status().await; + let mut paths = vec![]; + let mut file_ids = HashSet::new(); + // Paths modified in parent commits in the set should also be updated in this + // commit + let commit = stored_commit.commit; + for parent_id in commit.parent_ids() { + if let Some(parent_paths) = commit_paths.get(parent_id) { + paths.extend_from_slice(parent_paths); + } + } + let parent_tree = commit.parent_tree(tx.repo())?; + let tree = commit.tree()?; + let mut diff_stream = parent_tree.diff_stream(&tree, &EverythingMatcher); + while let Some((repo_path, diff)) = diff_stream.next().await { + let (_before, after) = diff?; + for term in after.into_iter().flatten() { + if let TreeValue::File { id, executable: _ } = term { + file_ids.insert((repo_path.clone(), id)); + paths.push(repo_path.clone()); + } + } + } + + Ok(RunJobResult::Success { + old_id: (), + rewritten_id: (), + new_tree: (), + }) +} + /// Run a command across a set of revisions. /// /// @@ -50,15 +292,15 @@ pub struct RunArgs { pub fn cmd_run(ui: &mut Ui, command: &CommandHelper, args: &RunArgs) -> Result<(), CommandError> { let workspace_command = command.workspace_helper(ui)?; - let _resolved_commits: Vec<_> = workspace_command - .parse_union_revsets(&args.revisions)? - .evaluate_to_commits() + let resolved_commits: Vec<_> = workspace_command + .parse_revset(&args.revisions)? + .evaluate_to_commits()? .try_collect()?; // Jobs are resolved in this order: // 1. Commandline argument iff > 0. // 2. the amount of cores available. // 3. a single job, if all of the above fails. - let _jobs = match args.jobs { + let jobs = match args.jobs { Some(0) => return Err(user_error("must pass at least one job")), Some(jobs) => Some(jobs), None => std::thread::available_parallelism().map(|t| t.into()).ok(), @@ -66,8 +308,73 @@ pub fn cmd_run(ui: &mut Ui, command: &CommandHelper, args: &RunArgs) -> Result<( // Fallback to a single user-visible job. .unwrap_or(1usize); - let repo = workspace_command.repo(); - let cache_backend = repo.working_copy_store(); - let _wc_copies = cache_backend.get_or_create_stores(_resolved_commits)?; + let (mut sender_tx, receiver) = std::sync::mpsc::channel(); + // let repo = workspace_command.repo(); + // let cache_backend = repo.working_copy_store(); + // let _wc_copies = cache_backend.get_or_create_stores(_resolved_commits)?; + + // Toposort the commits. + let topo_sorted_commits = topo_order_forward_ok( + resolved_commits.to_vec(), + |c: &Commit| c.id(), + |c: &Commit| c.parent_ids(), + )?; + let stored_commits = create_working_copies(&topo_sorted_commits)?; + + let tx = workspace_command.start_transaction(); + // Start all the jobs. + async { run_inner(tx, sender_tx, jobs, &args.shell_command, &stored_commits).await? } + .block_on(); + + // Wait until we have all results. + loop { + let result = receiver.recv(); + if result.is_err() { + tracing::debug!("the"); + break; + } + match result { + RunJobResult::Success { + old_id, + new_id, + tree, + } => {} + RunJobResult::Failure(err) => {} + } + } + tx.mut_repo().transform_descendants( + command.settings(), + root_commits.iter().ids().cloned().collect_vec(), + |mut rewriter| { + let paths = commit_paths.get(rewriter.old_commit().id()).unwrap(); + let old_tree = rewriter.old_commit().tree()?; + let mut tree_builder = MergedTreeBuilder::new(old_tree.id().clone()); + for path in paths { + let old_value = old_tree.path_value(path); + let new_value = old_value.map(|old_term| { + if let Some(TreeValue::File { id, executable }) = old_term { + if let Some(new_id) = formatted.get(&(path, id)) { + Some(TreeValue::File { + id: new_id.clone(), + executable: *executable, + }) + } else { + old_term.clone() + } + } else { + old_term.clone() + } + }); + if new_value != old_value { + tree_builder.set_or_remove(path.clone(), new_value); + } + } + let new_tree = tree_builder.write_tree(rewriter.mut_repo().store())?; + let builder = rewriter.reparent(command.settings())?; + builder.set_tree_id(new_tree).write()?; + Ok(()) + }, + )?; + Err(user_error("This is a stub, do not use")) } diff --git a/lib/src/default_working_copy_store.rs b/lib/src/default_working_copy_store.rs index 91b7c236a6..6a8b806234 100644 --- a/lib/src/default_working_copy_store.rs +++ b/lib/src/default_working_copy_store.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! This file contains the default implementation of the `WorkingCopyStore` for both the Git and -//! native Backend. It stores the working copies in the `.jj/run/default` path as directories. +//! This file contains the default implementation of the `WorkingCopyStore` for +//! both the Git and native Backend. It stores the working copies in the +//! `.jj/run/default` path as directories. use std::any::Any; use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; use itertools::Itertools; @@ -38,8 +38,9 @@ struct StoredWorkingCopy { commit: Commit, /// Current state of the associated [`WorkingCopy`]. state: Arc, - /// The output path for tools, which do not specify a location. Like C(++) Compilers, scripts and more. - /// It also contains the respective output stream, so stderr and stdout which was redirected for this commit. + /// The output path for tools, which do not specify a location. Like C(++) + /// Compilers, scripts and more. It also contains the respective output + /// stream, so stderr and stdout which was redirected for this commit. output_path: PathBuf, /// Path to the associated working copy. working_copy_path: PathBuf, @@ -72,8 +73,8 @@ impl StoredWorkingCopy { } } - /// Replace the currently cached working-copy and it's tree with the tree from `commit`. - /// Automatically marks it as used. + /// Replace the currently cached working-copy and it's tree with the tree + /// from `commit`. Automatically marks it as used. fn replace_with(&mut self, commit: &Commit) -> Result { let Self { commit: _, @@ -108,8 +109,8 @@ pub struct DefaultWorkingCopyStore { stored_paths: PathBuf, /// All managed working copies. stored_working_copies: Vec, - /// The store which owns this and all other backend related stuff. It gets set during the first - /// creation of the managed working copies. + /// The store which owns this and all other backend related stuff. It gets + /// set during the first creation of the managed working copies. store: OnceLock>, } @@ -150,7 +151,8 @@ fn to_wc_name(id: &MergedTreeId) -> String { }) .collect_vec(); let mut obfuscated: String = ids.concat(); - // `PATH_MAX` could be a problem for different operating systems, so truncate it. + // `PATH_MAX` could be a problem for different operating systems, so truncate + // it. if obfuscated.len() >= 255 { obfuscated.truncate(200); } @@ -239,9 +241,9 @@ impl WorkingCopyStore for DefaultWorkingCopyStore { !self.stored_working_copies.is_empty(), "we must have working copies after the first call" ); - // If we already have some existing working copies, try to minimize pending work. - // This is done by finding the intersection of the existing and new commits and only - // creating the non-overlapping revisions. + // If we already have some existing working copies, try to minimize pending + // work. This is done by finding the intersection of the existing and + // new commits and only creating the non-overlapping revisions. let new_revision_ids = revisions.iter().map(|rev| rev.id().clone()).collect_vec(); let contained_revisions = self .stored_working_copies @@ -249,8 +251,8 @@ impl WorkingCopyStore for DefaultWorkingCopyStore { .map(|sc| sc.commit.id().clone()) .collect_vec(); let contained_revset = RevsetExpression::commits(contained_revisions); - // intersect the existing revisions with the newly requested revisions to see which need to - // be replaced. + // intersect the existing revisions with the newly requested revisions to see + // which need to be replaced. let overlapping_commits_revset = &contained_revset.intersection(&RevsetExpression::commits(new_revision_ids)); let overlappping_commits: Vec = overlapping_commits_revset diff --git a/lib/src/working_copy_store.rs b/lib/src/working_copy_store.rs index 10cf55fe00..acc383b502 100644 --- a/lib/src/working_copy_store.rs +++ b/lib/src/working_copy_store.rs @@ -12,30 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! This file contains the [`WorkingCopyStore`] interface which is used to cached working copies. +//! This file contains the [`WorkingCopyStore`] interface which is used to +//! cached working copies. //! //! These must be implemented for Virtual Filesystems such as [EdenFS] -//! to allow cheaper working copy materializations, they are used for the `jj run` -//! implementation. +//! to allow cheaper working copy materializations, they are used for the `jj +//! run` implementation. //! //! //! [EdenFS]: www.github.com/facebook/sapling/main/blob/eden/fs -use std::{any::Any, convert::Infallible, io}; +use std::any::Any; +use std::convert::Infallible; +use std::io; -use crate::{ - backend::{BackendError, CommitId}, - commit::Commit, - local_working_copy::LocalWorkingCopy, - repo::Repo, - revset::RevsetEvaluationError, -}; use thiserror::Error; +use crate::backend::{BackendError, CommitId}; +use crate::commit::Commit; +use crate::local_working_copy::LocalWorkingCopy; +use crate::repo::Repo; +use crate::revset::RevsetEvaluationError; + /// An Error from the Cache, which [`WorkingCopyStore`] represents. #[derive(Debug, Error)] pub enum WorkingCopyStoreError { - /// We failed to initialize something, the store or any underlying working-copies. + /// We failed to initialize something, the store or any underlying + /// working-copies. #[error("failed to initialize")] Initialization(#[from] io::Error), /// An error occured during a `CachedWorkingCopy` update. @@ -55,17 +58,19 @@ pub enum WorkingCopyStoreError { } /// A `WorkingCopyStore` manages the working copies on disk for `jj run`. -/// It's an ideal extension point for an virtual filesystem, as they ease the creation of -/// working copies. +/// It's an ideal extension point for an virtual filesystem, as they ease the +/// creation of working copies. /// -/// The trait's design is similar to a database. Clients request a single or multiple working-copies -/// and the backend can coalesce the requests if needed. This allows an implementation to build -/// a global view of all actively used working-copies and where they are stored. +/// The trait's design is similar to a database. Clients request a single or +/// multiple working-copies and the backend can coalesce the requests if needed. +/// This allows an implementation to build a global view of all actively used +/// working-copies and where they are stored. pub trait WorkingCopyStore: Send + Sync { /// Return `self` as `Any` to allow trait upcasting. fn as_any(&self) -> &dyn Any; - /// The name of the backend, determines how it actually interacts with working copies. + /// The name of the backend, determines how it actually interacts with + /// working copies. fn name(&self) -> &str; /// Get existing or create `Stores` for `revisions`.