Skip to content

Commit

Permalink
feat(hydroflow_plus): add ability to have staged flows inside unit tests
Browse files Browse the repository at this point in the history
Whenever a Hydroflow+ program is compiled, it depends on a generated `__staged` module, which contains the entire contents of the crate but with every type / function made `pub` and exported, so that the compiled UDFs can resolve local references appropriately.

Previously, we would not do this for `#[cfg(test)]` modules, since they may use `dev-dependencies` and therefore the generated module may fail to compile when not in test mode. To solve this, when running a unit test (marked with `hydroflow_plus::deploy::init_test()`) that uses trybuild, we emit a version of the `__staged` module with `#[cfg(test)]` modules included _into the generated trybuild sources_ because we can guarantee via trybuild that the appropriate `dev-dependencies` are available.

This by itself allows crates depending on `hydroflow_plus` to have local unit tests with Hydroflow+ logic inside them. But we also want to use this support for unit tests inside `hydroflow_plus` itself. To enable that, we eliminate the `hydroflow_plus_deploy` crate and move its contents directly to `hydroflow_plus` itself so that we can access the trybuild machinery without incurring a circular dependency.

Also fixes #1408
  • Loading branch information
shadaj committed Sep 14, 2024
1 parent 8a80931 commit ae1b6a7
Show file tree
Hide file tree
Showing 69 changed files with 369 additions and 602 deletions.
1 change: 0 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ jobs:
hydroflow hydroflow_lang hydroflow_macro hydroflow_plus
hydroflow_datalog hydroflow_datalog_core
hydro_deploy hydro_cli hydroflow_deploy_integration
hydroflow_plus_deploy
stageleft stageleft_macro stageleft_tool
multiplatform_test
env:
Expand Down
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ members = [
"hydro_deploy/hydro_cli",
"hydro_deploy/hydro_cli_examples",
"hydro_deploy/hydroflow_deploy_integration",
"hydro_deploy/hydroflow_plus_deploy",
"hydroflow",
"hydroflow_datalog",
"hydroflow_datalog_core",
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ To deploy this application, we must set up the Hydro Deploy configuration as bef
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_deploy::TrybuildHost;
use hydroflow_plus::deploy::TrybuildHost;

#[tokio::main]
async fn main() {
Expand Down
1 change: 0 additions & 1 deletion hydro_deploy/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,3 @@ tempfile = "3.0.0"
tokio = { version = "1.29.0", features = [ "full" ] }
tokio-stream = { version = "0.1.3", default-features = false }
tokio-util = { version = "0.7.5", features = [ "compat", "io-util" ] }

358 changes: 0 additions & 358 deletions hydro_deploy/hydroflow_plus_deploy/CHANGELOG.md

This file was deleted.

35 changes: 0 additions & 35 deletions hydro_deploy/hydroflow_plus_deploy/Cargo.toml

This file was deleted.

3 changes: 0 additions & 3 deletions hydro_deploy/hydroflow_plus_deploy/build.rs

This file was deleted.

26 changes: 0 additions & 26 deletions hydro_deploy/hydroflow_plus_deploy/src/lib.rs

This file was deleted.

18 changes: 14 additions & 4 deletions hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ workspace = true
path = "src/lib.rs"

[features]
default = ["deploy"]
diagnostics = [ "hydroflow_lang/diagnostics" ]
deploy_integration = [ "hydroflow/deploy_integration" ]
stageleft_devel = []
deploy = [ "hydroflow/deploy_integration", "dep:hydro_deploy", "dep:trybuild-internals-api", "dep:toml", "dep:prettyplease" ]

[dependencies]
quote = "1.0.35"
Expand All @@ -25,15 +26,24 @@ proc-macro2 = "1.0.74"
proc-macro-crate = "1.0.0"
hydroflow = { path = "../hydroflow", version = "^0.9.0", default-features = false }
hydroflow_lang = { path = "../hydroflow_lang", version = "^0.9.0" }
serde = "1.0.197"
serde = { version = "1.0.197", features = [ "derive" ] }
bincode = "1.3.1"
tokio = { version = "1.29.0", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.4.0" }

# added to workaround `cargo smart-release` https://github.com/Byron/cargo-smart-release/issues/16
stageleft_tool = { path = "../stageleft_tool", version = "^0.3.0", optional = true }
nameof = "1.0.0"
sha2 = "0.10.0"
stageleft_tool = { path = "../stageleft_tool", version = "^0.3.0" }
hydro_deploy = { path = "../hydro_deploy/core", version = "^0.9.0", optional = true }
prettyplease = { version = "0.2.0", features = [ "verbatim" ], optional = true }
toml = { version = "0.8.0", optional = true }
trybuild-internals-api = { version = "1.0.99", optional = true }

[build-dependencies]
stageleft_tool = { path = "../stageleft_tool", version = "^0.3.0" }

[dev-dependencies]
insta = "1.39"
hydro_deploy = { path = "../hydro_deploy/core", version = "^0.9.0" }
async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] }
ctor = "0.2.8"
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,27 @@ use hydro_deploy::hydroflow_crate::ports::{
use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions;
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate};
use hydroflow_plus::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use hydroflow_plus::futures::SinkExt;
use hydroflow_plus::lang::graph::HydroflowGraph;
use hydroflow_plus::util::deploy::ConnectedSink;
use nameof::name_of;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use stageleft::{Quoted, RuntimeData};
use syn::visit_mut::VisitMut;
use tokio::sync::RwLock;

use super::HydroflowPlusMeta;
use crate::deploy_runtime::*;
use crate::trybuild::{compile_graph_trybuild, create_trybuild};
use trybuild_internals_api::path;

use super::deploy_runtime::*;
use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::futures::SinkExt;
use crate::lang::graph::HydroflowGraph;
use crate::util::deploy::ConnectedSink;

#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
pub clusters: HashMap<usize, Vec<u32>>,
pub cluster_id: Option<u32>,
pub subgraph_id: usize,
}

pub struct HydroDeploy {}

Expand Down Expand Up @@ -413,37 +421,24 @@ impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
fn as_bytes_sink(
&self,
key: usize,
) -> impl Future<
Output = Pin<
Box<dyn hydroflow_plus::futures::Sink<hydroflow_plus::bytes::Bytes, Error = Error>>,
>,
> + 'a {
) -> impl Future<Output = Pin<Box<dyn crate::futures::Sink<crate::bytes::Bytes, Error = Error>>>> + 'a
{
let port = self.raw_port(key);
async move {
let sink = port.connect().await.into_sink();
Box::pin(sink)
as Pin<
Box<
dyn hydroflow_plus::futures::Sink<
hydroflow_plus::bytes::Bytes,
Error = Error,
>,
>,
>
Box::pin(sink) as Pin<Box<dyn crate::futures::Sink<crate::bytes::Bytes, Error = Error>>>
}
}

fn as_bincode_sink<T: Serialize + 'static>(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn hydroflow_plus::futures::Sink<T, Error = Error>>>> + 'a
{
) -> impl Future<Output = Pin<Box<dyn crate::futures::Sink<T, Error = Error>>>> + 'a {
let port = self.raw_port(key);
async move {
let sink = port.connect().await.into_sink();
Box::pin(sink.with(|item| async move {
Ok(hydroflow_plus::bincode::serialize(&item).unwrap().into())
})) as Pin<Box<dyn hydroflow_plus::futures::Sink<T, Error = Error>>>
Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
as Pin<Box<dyn crate::futures::Sink<T, Error = Error>>>
}
}
}
Expand Down Expand Up @@ -732,6 +727,44 @@ fn clean_name_hint(name_hint: &str) -> String {
.replace(")", "")
}

// TODO(shadaj): has to be public due to stageleft limitations
#[doc(hidden)]
pub struct ReplaceCrateNameWithStaged {
pub crate_name: String,
}

impl VisitMut for ReplaceCrateNameWithStaged {
fn visit_type_path_mut(&mut self, i: &mut syn::TypePath) {
if let Some(first) = i.path.segments.first() {
if first.ident == self.crate_name {
let tail = i.path.segments.iter().skip(1).collect::<Vec<_>>();
*i = syn::parse_quote!(crate::__staged #(::#tail)*);
}
}

syn::visit_mut::visit_type_path_mut(self, i);
}
}

// TODO(shadaj): has to be public due to stageleft limitations
#[doc(hidden)]
pub struct ReplaceCrateWithOrig {
pub crate_name: String,
}

impl VisitMut for ReplaceCrateWithOrig {
fn visit_item_use_mut(&mut self, i: &mut syn::ItemUse) {
if let syn::UseTree::Path(p) = &mut i.tree {
if p.ident == "crate" {
p.ident = syn::Ident::new(&self.crate_name, p.ident.span());
i.leading_colon = Some(Default::default());
}
}

syn::visit_mut::visit_item_use_mut(self, i);
}
}

fn create_graph_trybuild(
graph: HydroflowGraph,
extra_stmts: Vec<syn::Stmt>,
Expand All @@ -740,15 +773,44 @@ fn create_graph_trybuild(
String,
(std::path::PathBuf, std::path::PathBuf, Option<Vec<String>>),
) {
let source_ast = compile_graph_trybuild(graph, extra_stmts);

let source_dir = trybuild_internals_api::cargo::manifest_dir().unwrap();
let source_manifest = trybuild_internals_api::dependencies::get_manifest(&source_dir).unwrap();
let crate_name = &source_manifest.package.name.to_string().replace("-", "_");
let source = prettyplease::unparse(&source_ast)
.to_string()
.replace(crate_name, &format!("{crate_name}::__staged"))
.replace("crate::__staged", &format!("{crate_name}::__staged"));

let is_test = super::trybuild::IS_TEST.load(std::sync::atomic::Ordering::Relaxed);

let mut generated_code = compile_graph_trybuild(graph, extra_stmts);

ReplaceCrateNameWithStaged {
crate_name: crate_name.clone(),
}
.visit_file_mut(&mut generated_code);

let mut inlined_staged = stageleft_tool::gen_staged_trybuild(
&path!(source_dir / "src" / "lib.rs"),
crate_name.clone(),
is_test,
);

ReplaceCrateWithOrig {
crate_name: crate_name.clone(),
}
.visit_file_mut(&mut inlined_staged);

let source = prettyplease::unparse(&syn::parse_quote! {
#generated_code

#[allow(
unused,
ambiguous_glob_reexports,
clippy::suspicious_else_formatting,
unexpected_cfgs,
reason = "generated code"
)]
pub mod __staged {
#inlined_staged
}
});

let mut hasher = Sha256::new();
hasher.update(&source);
Expand All @@ -763,7 +825,7 @@ fn create_graph_trybuild(
hash
};

let trybuild_created = create_trybuild(&source, &bin_name).unwrap();
let trybuild_created = create_trybuild(&source, &bin_name, is_test).unwrap();
(bin_name, trybuild_created)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use hydroflow_plus::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use stageleft::{q, Quoted, RuntimeData};

use crate::HydroflowPlusMeta;
use super::HydroflowPlusMeta;
use crate::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};

pub fn cluster_members(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
Expand Down
File renamed without changes.
Loading

0 comments on commit ae1b6a7

Please sign in to comment.