diff --git a/.github/workflows/maturin.yml b/.github/workflows/maturin.yml new file mode 100644 index 0000000..1c3568a --- /dev/null +++ b/.github/workflows/maturin.yml @@ -0,0 +1,181 @@ +# This file is autogenerated by maturin v1.7.4 +# To update, run +# +# maturin generate-ci github +# +name: CI + +on: + push: + branches: + - main + - master + tags: + - '*' + pull_request: + workflow_dispatch: + +permissions: + contents: read + +jobs: + linux: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: ubuntu-latest + target: x86_64 + - runner: ubuntu-latest + target: x86 + - runner: ubuntu-latest + target: aarch64 + - runner: ubuntu-latest + target: armv7 + - runner: ubuntu-latest + target: s390x + - runner: ubuntu-latest + target: ppc64le + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + manylinux: auto + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-linux-${{ matrix.platform.target }} + path: dist + + musllinux: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: ubuntu-latest + target: x86_64 + - runner: ubuntu-latest + target: x86 + - runner: ubuntu-latest + target: aarch64 + - runner: ubuntu-latest + target: armv7 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + manylinux: musllinux_1_2 + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-musllinux-${{ matrix.platform.target }} + path: dist + + windows: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: windows-latest + target: x64 + - runner: windows-latest + target: x86 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + architecture: ${{ matrix.platform.target }} + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-windows-${{ matrix.platform.target }} + path: dist + + macos: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + # - runner: macos-12 + # target: x86_64 + - runner: macos-14 + target: aarch64 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-macos-${{ matrix.platform.target }} + path: dist + + sdist: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Build sdist + uses: PyO3/maturin-action@v1 + with: + command: sdist + args: --out dist + - name: Upload sdist + uses: actions/upload-artifact@v4 + with: + name: wheels-sdist + path: dist + + release: + name: Release + runs-on: ubuntu-latest + if: ${{ startsWith(github.ref, 'refs/tags/') || github.event_name == 'workflow_dispatch' }} + needs: [linux, musllinux, windows, macos, sdist] + permissions: + # Use to sign the release artifacts + id-token: write + # Used to upload release artifacts + contents: write + # Used to generate artifact attestation + attestations: write + steps: + - uses: actions/download-artifact@v4 + - name: Generate artifact attestation + uses: actions/attest-build-provenance@v1 + with: + subject-path: 'wheels-*/*' + - name: Publish to PyPI + if: "startsWith(github.ref, 'refs/tags/')" + uses: PyO3/maturin-action@v1 + env: + MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} + with: + command: upload + args: --non-interactive --skip-existing wheels-*/* diff --git a/.github/workflows/publish_pypi.yaml b/.github/workflows/publish_pypi.yaml deleted file mode 100644 index f9daf47..0000000 --- a/.github/workflows/publish_pypi.yaml +++ /dev/null @@ -1,24 +0,0 @@ -name: Publish Python package to PyPI - -on: - push: - tags: - - v* - -jobs: - publish: - runs-on: ubuntu-latest - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: 3.9 - cache: 'pip' - - name: Build source distribution - run: pip install build && python -m build - - name: Publish to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 - with: - password: ${{ secrets.PYPI_TOKEN }} diff --git a/.github/workflows/ci.yaml b/.github/workflows/style_lint_test.yaml similarity index 96% rename from .github/workflows/ci.yaml rename to .github/workflows/style_lint_test.yaml index 9749b49..e864b25 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/style_lint_test.yaml @@ -4,7 +4,7 @@ on: push jobs: - ci: + style-lint-test: runs-on: ubuntu-latest steps: - name: Checkout repository diff --git a/.gitignore b/.gitignore index 576c037..938b5f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,82 @@ .envrc +.env +.test +temp* results figures -dist workloads* -*.egg-info -**/__pycache__ **/*.png +Cargo.lock + +/target + +# Byte-compiled / optimized / DLL files +__pycache__/ +.pytest_cache/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +.venv/ +env/ +bin/ +build/ +develop-eggs/ +dist/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +include/ +man/ +venv/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt +pip-selfcheck.json + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.cache +nosetests.xml +coverage.xml + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject + +# Rope +.ropeproject + +# Django stuff: +*.log +*.pot + .DS_Store + +# Sphinx documentation +docs/_build/ + +# PyCharm +.idea/ + +# VSCode +.vscode/ + +# Pyenv +.python-version diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f143f97 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "_lowtime_rs" +version = "0.2.0" # keep synced with __init__.py, pyproject.toml +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +name = "_lowtime_rs" +crate-type = ["cdylib"] + +[dependencies] +pyo3 = { version = "0.23.3", features = ["extension-module"] } +pyo3-log = "0.12.0" +log = "0.4" +ordered-float = { version = "4.0", default-features = false } +pathfinding = "4.11.0" diff --git a/lowtime/__init__.py b/lowtime/__init__.py index ee2edd1..cf277a0 100644 --- a/lowtime/__init__.py +++ b/lowtime/__init__.py @@ -14,4 +14,4 @@ """A library for solving the time-cost tradeoff problem.""" -__version__ = "0.2.0" +__version__ = "0.2.0" # keep synced with pyproject.toml, Cargo.toml diff --git a/lowtime/_lowtime_rs.pyi b/lowtime/_lowtime_rs.pyi new file mode 100644 index 0000000..7b19ab9 --- /dev/null +++ b/lowtime/_lowtime_rs.pyi @@ -0,0 +1,19 @@ +from __future__ import annotations + +import networkx as nx + +class PhillipsDessouky: + def __init__( + self, + fp_error: float, + node_ids: list[int] | nx.classes.reportviews.NodeView, + source_node_id: int, + sink_node_id: int, + edges_raw: list[ + tuple[ + tuple[int, int], + tuple[float, float, float, float], + ] + ], + ) -> None: ... + def find_min_cut(self) -> tuple[set[int], set[int]]: ... diff --git a/lowtime/py.typed b/lowtime/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/lowtime/solver.py b/lowtime/solver.py index 8697400..ab7a060 100644 --- a/lowtime/solver.py +++ b/lowtime/solver.py @@ -34,6 +34,7 @@ get_total_cost, ) from lowtime.exceptions import LowtimeFlowError +from lowtime import _lowtime_rs FP_ERROR = 1e-6 @@ -160,6 +161,35 @@ def __init__(self, dag: nx.DiGraph, attr_name: str = "op") -> None: self.aon_dag = dag self.unit_time = unit_time_candidates.pop() + def format_rust_inputs( + self, + dag: nx.DiGraph, + ) -> tuple[ + nx.classes.reportviews.NodeView, + list[ + tuple[ + tuple[int, int], + tuple[float, float, float, float], + ] + ], + ]: + """Convert Python-side nx.DiGraph into format compatible with Rust-side LowtimeGraph.""" + nodes = dag.nodes + edges = [] + for from_, to_, edge_attrs in dag.edges(data=True): + edges.append( + ( + (from_, to_), + ( + edge_attrs.get("capacity", 0), + edge_attrs.get("flow", 0), + edge_attrs.get("ub", 0), + edge_attrs.get("lb", 0), + ), + ) + ) + return nodes, edges + def run(self) -> Generator[IterationResult, None, None]: """Run the algorithm and yield a DAG after each iteration. @@ -231,6 +261,7 @@ def run(self) -> Generator[IterationResult, None, None]: ) self.annotate_capacities(critical_dag) + if logger.isEnabledFor(logging.DEBUG): logger.debug("Capacity DAG:") logger.debug( @@ -243,13 +274,22 @@ def run(self) -> Generator[IterationResult, None, None]: ) try: - s_set, t_set = self.find_min_cut(critical_dag) + nodes, edges = self.format_rust_inputs(critical_dag) + rust_runner = _lowtime_rs.PhillipsDessouky( + FP_ERROR, + nodes, + critical_dag.graph["source_node"], + critical_dag.graph["sink_node"], + edges, + ) + s_set, t_set = rust_runner.find_min_cut() except LowtimeFlowError as e: logger.info("Could not find minimum cut: %s", e.message) logger.info("Terminating PD iteration.") break cost_change = self.reduce_durations(critical_dag, s_set, t_set) + if cost_change == float("inf") or abs(cost_change) < FP_ERROR: logger.info("No further time reduction possible.") logger.info("Terminating PD iteration.") @@ -328,6 +368,10 @@ def reduce_durations( def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]: """Find the min cut of the DAG annotated with lower/upper bound flow capacities. + Note: this function is not used and instead accelerated by calling + rust_runner.find_min_cut. It is left here for reference in case someone wants + to modify the algorithm in Python for research. + Assumptions: - The capacity DAG is in AOA form. - The capacity DAG has been annotated with `lb` and `ub` attributes on edges, diff --git a/pyproject.toml b/pyproject.toml index a8296f6..0e33d5f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,14 +1,16 @@ [build-system] -requires = ["setuptools>=61.0.0", "wheel"] -build-backend = "setuptools.build_meta" +requires = ["maturin>=1.7,<2.0"] +build-backend = "maturin" [project] name = "lowtime" description = "A library for solving the time-cost tradeoff problem." +version="0.2.0" # keep synced with __init__.py, Cargo.toml readme = "README.md" authors = [ {name = "Jae-Won Chung", email = "jwnchung@umich.edu"}, {name = "Yile Gu", email = "yilegu@umich.edu"}, + {name = "Oh Jun Kweon", email = "ohjun@umich.edu"}, ] license = {text = "Apache 2.0"} classifiers = [ @@ -17,6 +19,7 @@ classifiers = [ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Rust", ] keywords = ["optimization", "tradeoff", "DAG"] requires-python = ">=3.8" @@ -27,7 +30,10 @@ dependencies = [ "numpy", "networkx", ] -dynamic = ["version"] + +[tool.maturin] +module-name = "lowtime._lowtime_rs" +features = ["pyo3/extension-module"] [project.urls] Repository = "https://github.com/ml-energy/lowtime" @@ -37,16 +43,6 @@ lint = ["ruff", "black"] test = ["pytest"] dev = ["ruff", "black", "pytest", "tyro", "pandas", "pyright"] -[tool.setuptools] -include-package-data = false - -[tool.setuptools.dynamic] -version = {attr = "lowtime.__version__"} - -[tool.setuptools.packages.find] -include = ["lowtime*"] -exclude = ["examples", "scripts", "stubs", "tests"] - [tool.ruff] line-length = 120 diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..30d4fae --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,15 @@ +use pyo3::prelude::*; + +mod lowtime_graph; +mod phillips_dessouky; +mod utils; + +use phillips_dessouky::PhillipsDessouky; + + +#[pymodule] +fn _lowtime_rs(m: &Bound<'_, PyModule>) -> PyResult<()> { + pyo3_log::init(); // send Rust logs to Python logger + m.add_class::()?; + Ok(()) +} diff --git a/src/lowtime_graph.rs b/src/lowtime_graph.rs new file mode 100644 index 0000000..bbc89bc --- /dev/null +++ b/src/lowtime_graph.rs @@ -0,0 +1,158 @@ +use std::collections::{HashMap, HashSet}; +use ordered_float::OrderedFloat; +use pathfinding::directed::edmonds_karp::{ + SparseCapacity, + Edge, + EKFlows, + edmonds_karp, +}; + + +#[derive(Clone)] +pub struct LowtimeGraph { + pub node_ids: Vec, + pub source_node_id: u32, + pub sink_node_id: u32, + edges: HashMap>, + preds: HashMap>, + num_edges: usize, +} + +impl LowtimeGraph { + pub fn new(source_node_id: u32, sink_node_id: u32) -> Self { + LowtimeGraph { + node_ids: Vec::new(), + source_node_id, + sink_node_id, + edges: HashMap::new(), + preds: HashMap::new(), + num_edges: 0, + } + } + + pub fn of_python( + mut node_ids: Vec, + source_node_id: u32, + sink_node_id: u32, + edges_raw: Vec<((u32, u32), (f64, f64, f64, f64))>, + ) -> Self { + let mut graph = LowtimeGraph::new(source_node_id, sink_node_id); + node_ids.sort(); + graph.node_ids = node_ids.clone(); + + edges_raw.iter().for_each(|( + (from, to), + (capacity, flow, ub, lb), + )| { + graph.add_edge(*from, *to, LowtimeEdge::new( + OrderedFloat(*capacity), + *flow, + *ub, + *lb, + )) + }); + graph + } + + pub fn max_flow(&self) -> EKFlows> { + let edges_edmonds_karp = self.get_ek_preprocessed_edges(); + edmonds_karp::<_, _, _, SparseCapacity<_>>( + &self.node_ids, + &self.source_node_id, + &self.sink_node_id, + edges_edmonds_karp, + ) + } + + pub fn num_nodes(&self) -> usize { + self.node_ids.len() + } + + pub fn num_edges(&self) -> usize { + self.num_edges + } + + pub fn successors(&self, node_id: u32) -> Option> { + self.edges.get(&node_id).map(|succs| succs.keys()) + } + + pub fn predecessors(&self, node_id: u32) -> Option> { + self.preds.get(&node_id).map(|preds| preds.iter()) + } + + pub fn edges(&self) -> impl Iterator { + self.edges.iter().flat_map(|(from, inner)| { + inner.iter().map(move |(to, edge)| (from, to, edge)) + }) + } + + pub fn edges_mut(&mut self) -> impl Iterator { + self.edges.iter_mut().flat_map(|(from, inner)| { + inner.iter_mut().map(move |(to, edge)| (from, to, edge)) + }) + } + + pub fn add_node_id(&mut self, node_id: u32) -> () { + assert!(self.node_ids.last().unwrap() < &node_id, "New node ids must be larger than all existing node ids"); + self.node_ids.push(node_id) + } + + pub fn has_edge(&self, from: u32, to: u32) -> bool { + match self.edges.get(&from).and_then(|inner| inner.get(&to)) { + Some(_) => true, + None => false, + } + } + + pub fn get_edge(&self, from: u32, to: u32) -> &LowtimeEdge { + self.edges.get(&from) + .and_then(|inner| inner.get(&to)) + .expect(&format!("Edge {} to {} not found", from, to)) + } + + pub fn get_edge_mut(&mut self, from: u32, to: u32) -> &mut LowtimeEdge { + self.edges.get_mut(&from) + .and_then(|inner| inner.get_mut(&to)) + .expect(&format!("Edge {} to {} not found", from, to)) + } + + pub fn add_edge(&mut self, from: u32, to: u32, edge: LowtimeEdge) -> () { + self.edges.entry(from).or_insert_with(HashMap::new).insert(to, edge); + self.preds.entry(to).or_insert_with(HashSet::new).insert(from); + self.num_edges += 1; + } + + fn get_ek_preprocessed_edges(&self, ) -> Vec>> { + let mut processed_edges = Vec::with_capacity(self.num_edges); + processed_edges.extend( + self.edges.iter().flat_map(|(from, inner)| + inner.iter().map(|(to, edge)| + ((*from, *to), edge.capacity) + ))); + processed_edges + } +} + +#[derive(Clone)] +pub struct LowtimeEdge { + pub capacity: OrderedFloat, + pub flow: f64, + pub ub: f64, + pub lb: f64, +} + +impl LowtimeEdge { + pub fn new( + capacity: OrderedFloat, + flow: f64, + ub: f64, + lb: f64, + ) -> Self { + LowtimeEdge { + capacity, + flow, + ub, + lb, + } + } +} diff --git a/src/phillips_dessouky.rs b/src/phillips_dessouky.rs new file mode 100644 index 0000000..fde1183 --- /dev/null +++ b/src/phillips_dessouky.rs @@ -0,0 +1,307 @@ +use pyo3::prelude::*; + +use std::collections::{HashMap, HashSet, VecDeque}; +use log::{debug, error, Level}; +use ordered_float::OrderedFloat; + +use crate::lowtime_graph::{LowtimeGraph, LowtimeEdge}; + + +#[pyclass] +pub struct PhillipsDessouky { + dag: LowtimeGraph, + fp_error: f64, +} + +#[pymethods] +impl PhillipsDessouky { + #[new] + fn new( + fp_error: f64, + node_ids: Vec, + source_node_id: u32, + sink_node_id: u32, + edges_raw: Vec<((u32, u32), (f64, f64, f64, f64))>, + ) -> PyResult { + Ok(PhillipsDessouky { + dag: LowtimeGraph::of_python( + node_ids, + source_node_id, + sink_node_id, + edges_raw, + ), + fp_error, + }) + } + + /// Find the min cut of the DAG annotated with lower/upper bound flow capacities. + /// + /// Assumptions: + /// - The capacity DAG is in AOA form. + /// - The capacity DAG has been annotated with `lb` and `ub` attributes on edges, + /// representing the lower and upper bounds of the flow on the edge. + /// + /// Returns: + /// A tuple of (s_set, t_set) where s_set is the set of nodes on the source side + /// of the min cut and t_set is the set of nodes on the sink side of the min cut. + /// + /// Panics if: + /// - Any of the assumptions are not true. + /// - No feasible flow exists. + fn find_min_cut(&mut self) -> (HashSet, HashSet) { + // In order to solve max flow on edges with both lower and upper bounds, + // we first need to convert it to another DAG that only has upper bounds. + let mut unbound_dag: LowtimeGraph = self.dag.clone(); + + // For every edge, capacity = ub - lb. + unbound_dag.edges_mut().for_each(|(_from, _to, edge)| + edge.capacity = OrderedFloat(edge.ub - edge.lb) + ); + + // Add a new node s', which will become the new source node. + // We constructed the AOA DAG, so we know that node IDs are integers. + let s_prime_id = unbound_dag.node_ids.last().unwrap() + 1; + unbound_dag.add_node_id(s_prime_id); + + // For every node u in the original graph, add an edge (s', u) with capacity + // equal to the sum of all lower bounds of u's parents. + let orig_node_ids = &self.dag.node_ids; + for u in orig_node_ids.iter() { + let mut capacity = OrderedFloat(0.0); + if let Some(preds) = unbound_dag.predecessors(*u) { + capacity = OrderedFloat(preds.fold(0.0, |acc, pred_id| { + acc + unbound_dag.get_edge(*pred_id, *u).lb + })); + } + unbound_dag.add_edge(s_prime_id, *u, LowtimeEdge::new( + capacity, + 0.0, // flow + 0.0, // ub + 0.0, // lb + )); + } + + // Add a new node t', which will become the new sink node. + let t_prime_id = s_prime_id + 1; + unbound_dag.add_node_id(t_prime_id); + + // For every node u in the original graph, add an edge (u, t') with capacity + // equal to the sum of all lower bounds of u's children. + for u in orig_node_ids.iter() { + let mut capacity = OrderedFloat(0.0); + if let Some(succs) = unbound_dag.successors(*u) { + capacity = OrderedFloat(succs.fold(0.0, |acc, succ_id| { + acc + unbound_dag.get_edge(*u, *succ_id).lb + })); + } + unbound_dag.add_edge(*u, t_prime_id, LowtimeEdge::new( + capacity, + 0.0, // flow + 0.0, // ub + 0.0, // lb + )); + } + + if log::log_enabled!(Level::Debug) { + debug!("Unbound DAG"); + debug!("Number of nodes: {}", unbound_dag.num_nodes()); + debug!("Number of edges: {}", unbound_dag.num_edges()); + let total_capacity = unbound_dag.edges() + .fold(OrderedFloat(0.0), |acc, (_from, _to, edge)| acc + edge.capacity); + debug!("Sum of capacities: {}", total_capacity); + } + + // Add an edge from t to s with infinite capacity. + unbound_dag.add_edge( + unbound_dag.sink_node_id, + unbound_dag.source_node_id, + LowtimeEdge::new( + OrderedFloat(f64::INFINITY), // capacity + 0.0, // flow + 0.0, // ub + 0.0, // lb + ), + ); + + // Update source and sink on unbound_dag + unbound_dag.source_node_id = s_prime_id; + unbound_dag.sink_node_id = t_prime_id; + + // We're done with constructing the DAG with only flow upper bounds. + // Find the maximum flow on this DAG. + let (flows, _max_flow, _min_cut): ( + Vec<((u32, u32), OrderedFloat)>, + OrderedFloat, + Vec<((u32, u32), OrderedFloat)>, + ) = unbound_dag.max_flow(); + + if log::log_enabled!(Level::Debug) { + debug!("After first max flow"); + let total_flow = flows.iter() + .fold(OrderedFloat(0.0), |acc, ((_from, _to), flow)| acc + flow); + debug!("Sum of all flow values: {}", total_flow); + } + + // Convert flows to dict for faster lookup + let flow_dict = flows.iter().fold( + HashMap::new(), + |mut acc: HashMap>, ((from, to), flow)| { + acc.entry(*from) + .or_insert_with(HashMap::new) + .insert(*to, flow.into_inner()); + acc + } + ); + + // Check if residual graph is saturated. If so, we have a feasible flow. + if let Some(succs) = unbound_dag.successors(s_prime_id) { + for u in succs { + let flow = flow_dict.get(&s_prime_id) + .and_then(|inner| inner.get(u)) + .unwrap_or(&0.0); + let cap = unbound_dag.get_edge(s_prime_id, *u).capacity; + let diff = (flow - cap.into_inner()).abs(); + if diff > self.fp_error { + error!( + "s' -> {} unsaturated (flow: {}, capacity: {})", + u, + flow_dict[&s_prime_id][u], + unbound_dag.get_edge(s_prime_id, *u).capacity, + ); + panic!("ERROR: Max flow on unbounded DAG didn't saturate."); + } + } + } + if let Some(preds) = unbound_dag.predecessors(t_prime_id) { + for u in preds { + let flow = flow_dict.get(u) + .and_then(|inner| inner.get(&t_prime_id)) + .unwrap_or(&0.0); + let cap = unbound_dag.get_edge(*u, t_prime_id).capacity; + let diff = (flow - cap.into_inner()).abs(); + if diff > self.fp_error { + error!( + "{} -> t' unsaturated (flow: {}, capacity: {})", + u, + flow_dict[u][&t_prime_id], + unbound_dag.get_edge(*u, t_prime_id).capacity, + ); + panic!("ERROR: Max flow on unbounded DAG didn't saturate."); + } + } + } + + // We have a feasible flow. Construct a new residual graph with the same + // shape as the capacity DAG so that we can find the min cut. + // First, retrieve the flow amounts to the original capacity graph, where for + // each edge u -> v, the flow amount is `flow + lb`. + for (u, v, edge) in self.dag.edges_mut() { + let flow = flow_dict.get(u) + .and_then(|inner| inner.get(v)) + .unwrap_or(&0.0); + edge.flow = flow + edge.lb; + } + + // Construct a new residual graph (same shape as capacity DAG) with + // u -> v capacity `ub - flow` and v -> u capacity `flow - lb`. + let mut residual_graph = self.dag.clone(); + for (u, v, _dag_edge) in self.dag.edges() { + // Rounding small negative values to 0.0 avoids pathfinding::edmonds_karp + // from entering unreachable code. Has no impact on correctness in test runs. + let residual_uv_edge = residual_graph.get_edge_mut(*u, *v); + let mut uv_capacity = OrderedFloat(residual_uv_edge.ub - residual_uv_edge.flow); + if uv_capacity.into_inner().abs() < self.fp_error { + uv_capacity = OrderedFloat(0.0); + } + residual_uv_edge.capacity = uv_capacity; + + let mut vu_capacity = OrderedFloat(residual_uv_edge.flow - residual_uv_edge.lb); + if vu_capacity.into_inner().abs() < self.fp_error { + vu_capacity = OrderedFloat(0.0); + } + + match self.dag.has_edge(*v, *u) { + true => residual_graph.get_edge_mut(*v, *u).capacity = vu_capacity, + false => residual_graph.add_edge(*v, *u, LowtimeEdge::new( + vu_capacity, + 0.0, // flow + 0.0, // ub + 0.0, // lb + )), + } + } + + // Run max flow on the new residual graph. + let (flows, _max_flow, _min_cut): ( + Vec<((u32, u32), OrderedFloat)>, + OrderedFloat, + Vec<((u32, u32), OrderedFloat)>, + ) = residual_graph.max_flow(); + + // Convert flows to dict for faster lookup + let flow_dict = flows.iter().fold( + HashMap::new(), + |mut acc: HashMap>, ((from, to), flow)| { + acc.entry(*from) + .or_insert_with(HashMap::new) + .insert(*to, flow.into_inner()); + acc + } + ); + + // Add additional flow we get to the original graph + for (u, v, edge) in self.dag.edges_mut() { + edge.flow += *flow_dict.get(u) + .and_then(|inner| inner.get(v)) + .unwrap_or(&0.0); + edge.flow -= *flow_dict.get(v) + .and_then(|inner| inner.get(u)) + .unwrap_or(&0.0); + } + + // Construct the new residual graph. + let mut new_residual = self.dag.clone(); + for (u, v, edge) in self.dag.edges() { + new_residual.get_edge_mut(*u, *v).flow = edge.ub - edge.flow; + new_residual.add_edge(*v, *u, LowtimeEdge::new( + OrderedFloat(0.0), // capacity + edge.flow - edge.lb, // flow + 0.0, // ub + 0.0, // lb + )); + } + + if log::log_enabled!(Level::Debug) { + debug!("New residual graph"); + debug!("Number of nodes: {}", new_residual.num_nodes()); + debug!("Number of edges: {}", new_residual.num_edges()); + let total_flow = unbound_dag.edges() + .fold(0.0, |acc, (_from, _to, edge)| acc + edge.flow); + debug!("Sum of capacities: {}", total_flow); + } + + // Find the s-t cut induced by the second maximum flow above. + // Only following `flow > 0` edges, find the set of nodes reachable from + // source node. That's the s-set, and the rest is the t-set. + let mut s_set: HashSet = HashSet::new(); + let mut q: VecDeque = VecDeque::new(); + q.push_back(new_residual.source_node_id); + while let Some(cur_id) = q.pop_back() { + s_set.insert(cur_id); + if cur_id == new_residual.sink_node_id { + break; + } + if let Some(succs) = new_residual.successors(cur_id) { + for child_id in succs { + let flow = new_residual.get_edge(cur_id, *child_id).flow; + if !s_set.contains(child_id) && flow.abs() > self.fp_error { + q.push_back(*child_id); + } + } + } + } + let all_nodes: HashSet = new_residual.node_ids.iter().copied().collect(); + let t_set: HashSet = all_nodes.difference(&s_set).copied().collect(); + (s_set, t_set) + } +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..391583f --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,17 @@ +use std::time::{ + Instant, + Duration, +}; + +// This function is not used in the codebase, but it is left here +// to facilitate profiling during development. +pub fn get_duration(start: Instant, end: Instant) -> f64 { + let duration: Duration = end.duration_since(start); + let seconds = duration.as_secs(); + let subsec_nanos = duration.subsec_nanos(); + + let fractional_seconds = subsec_nanos as f64 / 1_000_000_000.0; + let total_seconds = seconds as f64 + fractional_seconds; + + return total_seconds; +}