Skip to content

Commit

Permalink
WIP: run: Flesh out a bare implementation of jj run.
Browse files Browse the repository at this point in the history
This is basically a MVP based on `fix`, caching is not implemented yet.
The core functionality is in `run_inner()` and `rewrite_commit()`.

TODO: rewrite trees and expose all files
  • Loading branch information
PhilipMetzger committed Jun 20, 2024
1 parent d099dfd commit e294e99
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ tempfile = { workspace = true }
textwrap = { workspace = true }
thiserror = { workspace = true }
timeago = { workspace = true }
# TODO: Evaluate moving the Workqueue to another Crate, ideally `jj-util`.
tokio = { workspace = true }
toml_edit = { workspace = true }
tracing = { workspace = true }
tracing-chrome = { workspace = true }
Expand Down
216 changes: 212 additions & 4 deletions cli/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,157 @@

//! This file contains the internal implementation of `run`.
use clap::builder;
use config::builder;
use futures::channel::oneshot;
use itertools::Itertools as _;
use jj_lib::commit::Commit;
use jj_lib::transaction::Transaction;
use tokio::process::Command;
use tokio::sync::oneshot;

use crate::cli_util::{CommandHelper, RevisionArg};
use crate::command_error::{user_error, CommandError};
use std::path::PathBuf;
use std::process::ExitStatus;
use std::sync::mpsc;
use std::thread::{JoinHandle, Thread};

use futures::StreamExt;
use itertools::Itertools;
use jj_lib::dag_walk::topo_order_forward_ok;
use jj_lib::working_copy_store::{CachedWorkingCopy, WorkingCopyStoreError};
use pollster::FutureExt;
use tokio::task::JoinSet;
use tokio::{process, sync};

use crate::cli_util::{
resolve_multiple_nonempty_revsets, user_error, CommandError, CommandHelper, RevisionArg,
};
use crate::ui::Ui;
use thiserror::Error;
use tokio::runtime::Builder;

/// Creates the required directories for a StoredWorkingCopy.
/// Returns a tuple of (`output_dir`, `working_copy` and `state`).
fn create_working_copy_paths(
path: &PathBuf,
) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> {
let output = path.join("output");
let working_copy = path.join("working_copy");
let state = path.join("state");
std::fs::create_dir(&output)?;
std::fs::create_dir(&working_copy)?;
std::fs::create_dir(&state)?;
Ok((output, working_copy, state))
}

/// Represent a `MergeTreeId` in a way that it may be used as a working-copy
/// name. This makes no stability guarantee, as the format may change at
/// any time.
fn to_wc_name(id: &MergedTreeId) -> String {
match id {
MergedTreeId::Legacy(tree_id) => tree_id.hex(),
MergedTreeId::Merge(tree_ids) => {
let ids = tree_ids
.map(|id| id.hex())
.iter_mut()
.enumerate()
.map(|(i, s)| {
// Incredibly "smart" way to say, append "-" if the number is odd "+"
// otherwise.
if i & 1 != 0 {
s.push('-');
} else {
s.push('+');
}
s.to_owned()
})
.collect_vec();
let mut obfuscated: String = ids.concat();
// `PATH_MAX` could be a problem for different operating systems, so truncate it.
if obfuscated.len() >= 255 {
obfuscated.truncate(200);
}
obfuscated
}
}
}

/// A commit stored under `.jj/run/`
// TODO: Create a caching backend, which creates these on a dedicated thread or threadpool.
struct StoredCommit {
/// The respective
commit: Commit,
///
path: PathBuf,
}

/// The result of a single command invocation in `run_inner`.
enum RunJobResult {
/// A `Tree` and it's rewritten `CommitId`
Success {
/// The new `CommitId` for the commit.
rewritten_id: CommitId,
/// The new tree generated from the commit.
new_tree: Tree,
},
/// The commands exit code
// TODO: use an actual error here.
Failure(ExitStatus),
}

// TODO: make this more revset stream friendly.
async fn run_inner(
tx: Transaction,
sender: std::sync::mpsc::Sender<RunJobResult>,
jobs: usize,
shell_command: &str,
commits: &[StoredCommit],
) -> Result<(), RunError> {
let mut command_futures = JoinSet::new();
for commit in commits {
command_futures.spawn(rewrite_commit(tx, commit, shell_command));
}

while let Some(done) = command_futures.join_next().await {
sender.send(done.unwrap())
}
}

/// Rewrite a single `StoredCommit`.
async fn rewrite_commit(
tx: Transaction,
stored_commit: StoredCommit,
shell_command: &str,
) -> RunJobResult {
let mut command_builder = Command::new("sh")
.args(shell_command)
// TODO: relativize
.env("JJ_PATH", commit.tree_path)
.stdout(commit.stdout)
.stderr(command.stderr);
let mut paths = vec![];
// Paths modified in parent commits in the set should also be updated in this
// commit
for parent_id in commit.parent_ids() {
if let Some(parent_paths) = commit_paths.get(parent_id) {
paths.extend_from_slice(parent_paths);
}
}
let parent_tree = commit.parent_tree(tx.repo())?;
let tree = commit.tree()?;
let mut diff_stream = parent_tree.diff_stream(&tree, &EverythingMatcher);
while let Some((repo_path, diff)) = diff_stream.next().await {
let (_before, after) = diff?;
for term in after.into_iter().flatten() {
if let TreeValue::File { id, executable: _ } = term {
file_ids.insert((repo_path.clone(), id));
paths.push(repo_path.clone());
}
}
}
}

/// Run a command across a set of revisions.
///
Expand Down Expand Up @@ -50,7 +196,7 @@ pub struct RunArgs {

pub fn cmd_run(ui: &mut Ui, command: &CommandHelper, args: &RunArgs) -> Result<(), CommandError> {
let workspace_command = command.workspace_helper(ui)?;
let _resolved_commits: Vec<_> = workspace_command
let resolved_commits: Vec<_> = workspace_command
.parse_union_revsets(&args.revisions)?
.evaluate_to_commits()
.try_collect()?;
Expand All @@ -66,8 +212,70 @@ pub fn cmd_run(ui: &mut Ui, command: &CommandHelper, args: &RunArgs) -> Result<(
// Fallback to a single user-visible job.
.unwrap_or(1usize);

let repo = workspace_command.repo();
let cache_backend = repo.working_copy_store();
let _wc_copies = cache_backend.get_or_create_stores(_resolved_commits)?;
let (mut sender_tx, receiver) = std::sync::mpsc::channel();
// let repo = workspace_command.repo();
// let cache_backend = repo.working_copy_store();
// let _wc_copies = cache_backend.get_or_create_stores(_resolved_commits)?;
// Toposort the commits.
let topo_sorted_commits = dag::walk::topo_order_forward_ok(
resolved_commits.to_vec(),
|c: Commit| c.id(),
|c: Commit| c.parent_ids(),
)?;
let stored_commits = create_working_copies(topo_sorted_commits)?;

let tx = workspace_command.start_transaction();
// Start all the jobs.
run_inner(tx, sender_tx, jobs, &args.shell_command, stored_commits)
.await?
.block_on();

// Wait until we have all results.
loop {
let result = receiver.recv();
if result.is_err() {
tracing::debug!("the");
break;
}
match result? {
RunJobResult::Success(commit, tree) => {}
RunJobResult::Failure(err) => {}
}
}
tx.mut_repo().transform_descendants(
command.settings(),
root_commits.iter().ids().cloned().collect_vec(),
|mut rewriter| {
let paths = commit_paths.get(rewriter.old_commit().id()).unwrap();
let old_tree = rewriter.old_commit().tree()?;
let mut tree_builder = MergedTreeBuilder::new(old_tree.id().clone());
for path in paths {
let old_value = old_tree.path_value(path);
let new_value = old_value.map(|old_term| {
if let Some(TreeValue::File { id, executable }) = old_term {
if let Some(new_id) = formatted.get(&(path, id)) {
Some(TreeValue::File {
id: new_id.clone(),
executable: *executable,
})
} else {
old_term.clone()
}
} else {
old_term.clone()
}
});
if new_value != old_value {
tree_builder.set_or_remove(path.clone(), new_value);
}
}
let new_tree = tree_builder.write_tree(rewriter.mut_repo().store())?;
let builder = rewriter.reparent(command.settings())?;
builder.set_tree_id(new_tree).write()?;
Ok(())
},
)?;

workqueue.run().await?.block_on();
Err(user_error("This is a stub, do not use"))
}

0 comments on commit e294e99

Please sign in to comment.