Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving flow rate calculation #71

Merged
merged 2 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ log/
diskcache_*
results_*
*.egg-info
.vscode
264 changes: 109 additions & 155 deletions CADETProcess/processModel/flowSheet.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from collections import defaultdict
from functools import wraps
from warnings import warn

import numpy as np
import sympy as sym
from addict import Dict

from CADETProcess import CADETProcessError
Expand Down Expand Up @@ -548,24 +546,48 @@ def set_output_state(self, unit, state):
self._output_states[unit] = output_state

def get_flow_rates(self, state=None):
"""Calculate flow rate for all connections.

Optionally, an additional output state can be passed to update the
current output states.
"""
Calculate the volumetric flow rate for all connections in the process.

Parameters
----------
state : Dict, optional
Output states
Updated flow rates and output states for process sections.
Default is None.

Returns
-------
flow_rates : Dict
Volumetric flow rate of each unit operation.
Dict
Volumetric flow rate for each unit operation.

Notes
-----
To calculate the flow rates, a system of equations is set up:

.. math::

Q_i = \sum_{j=1}^{n_{units}} q_{ji} = \sum_{j=1}^{n_{units}} Q_j * w_{ji},

where :math:`Q_i` is the total flow exiting unit :math:`i`, and :math:`w_{ij}`
is the percentile of the total flow of unit :math:`j` directed to unit
:math:`i`. If the unit is an `Inlet` or a `Cstr` with a given flow rate,
:math:`Q_i` is given and the system is simplified. This system is solved using
`numpy.linalg.solve`. Then, the individual flows :math:`q_{ji}` are extracted.

Raises
------
CADETProcessError
If flow sheet connectivity matrix is singular, indicating a potential issue
in flow sheet configuration.

References
----------
Forum discussion on flow rate calculation:
https://forum.cadet-web.de/t/improving-the-flowrate-calculation/795
"""
flow_rates = {
unit.name: unit.flow_rate for unit in (self.inlets + self.cstrs)
unit.name: unit.flow_rate
for unit in (self.inlets + self.cstrs)
if unit.flow_rate is not None
}

Expand All @@ -582,164 +604,98 @@ def get_flow_rates(self, state=None):
unit = self.units_dict[param_name]
output_states[unit] = list(value.ravel())

def list_factory():
return [0, 0, 0, 0]

destination_flow_rates = {
unit.name: defaultdict(list_factory) for unit in self.units
}
origin_flow_rates = {
unit.name: defaultdict(list_factory) for unit in self.units
}
n_units = self.number_of_units

for i in range(4):
solution = self.solve_flow_rates(flow_rates, output_states, i)
if solution is not None:
for unit_index, unit in enumerate(self.units):
for destination in self.connections[unit].destinations:
destination_index = self.get_unit_index(destination)
value = float(
solution[f'Q_{unit_index}_{destination_index}']
)
destination_flow_rates[unit.name][destination.name][i] = value
origin_flow_rates[destination.name][unit.name][i] = value
# Setup matrix with output states.
w_out = np.zeros((n_units, n_units))

flow_rates = Dict()
for unit in self.units:
for destination, flow_rate in destination_flow_rates[unit.name].items():
flow_rates[unit.name].destinations[destination] = np.array(flow_rate)
for origin, flow_rate in origin_flow_rates[unit.name].items():
flow_rates[unit.name].origins[origin] = np.array(flow_rate)
unit_index = self.get_unit_index(unit)
if unit.name in flow_rates:
w_out[unit_index, unit_index] = 1
else:
for origin in self.connections[unit]['origins']:
o_index = self.get_unit_index(origin)
local_d_index = self.connections[origin].destinations.index(unit)
w_out[unit_index, o_index] = output_states[origin][local_d_index]
w_out[unit_index, unit_index] += -1

# Check for a singular matrix before the loop
if np.linalg.cond(w_out) == np.inf:
raise CADETProcessError(
"Flow sheet connectivity matrix is singular, which may be due to "
"unconnected units or missing flow rates. Please ensure all units are "
"correctly connected and all necessary flow rates are set."
)

for unit in self.units:
if not isinstance(unit, Inlet):
flow_rate_in = np.sum(
list(flow_rates[unit.name].origins.values()), axis=0
)
flow_rates[unit.name].total_in = flow_rate_in
if not isinstance(unit, Outlet):
flow_rate_out = np.sum(
list(flow_rates[unit.name].destinations.values()), axis=0
# Solve system of equations for each polynomial coefficient
total_flow_rate_coefficents = np.zeros((4, n_units))
for i in range(4):
coeffs = np.array(list(flow_rates.values()), ndmin=2)[:, i]
if not np.any(coeffs):
continue

Q_vec = np.zeros(n_units)
for unit_name in flow_rates:
unit_index = self.get_unit_index(self.units_dict[unit_name])
Q_vec[unit_index] = flow_rates[unit_name][i]
try:
total_flow_rate_coefficents[i] = np.linalg.solve(w_out, Q_vec)
except np.linalg.LinAlgError:
raise CADETProcessError(
"Unexpected error in flow rate calculation. "
"Please check the flow sheet setup."
)
flow_rates[unit.name].total_out = flow_rate_out

return flow_rates
# w_out_help is the same as w_out but it contains the origin flow for every unit
w_out_help = np.zeros((n_units, n_units))

def solve_flow_rates(self, inlet_flow_rates, output_states, coeff=0):
"""Solve flow rates of system using sympy.
for unit in self.connections:
unit_index = self.get_unit_index(unit)
for origin in self.connections[unit]['origins']:
o_index = self.get_unit_index(origin)
local_d_index = self.connections[origin].destinations.index(unit)
w_out_help[unit_index, o_index] = output_states[origin][local_d_index]

Because a simple 'push' algorithm cannot be used when closed loops are
present in a FlowSheet (e.g. SMBs), sympy is used to set up and solve
the system of equations.
# Calculate total_in as a matrix in "one" step rather than iterating manually.
total_in_matrix = w_out_help @ total_flow_rate_coefficents.T

Parameters
----------
inlet_flow_rates: dict
Flow rates of Inlet UnitOperations.
output_states: dict
Output states of all UnitOperations.
coeff: int
Polynomial coefficient of flow rates to be solved.
# Generate output dict
return_flow_rates = Dict()
for index, unit in enumerate(self.units):
unit_solution_dict = Dict()

Returns
-------
solution : dict
Solution of the flow rates in the system
if not isinstance(unit, Inlet):
unit_solution_dict['total_in'] = list(total_in_matrix[index])

Notes
-----
Since dynamic flow rates can be described as cubic polynomials, the
flow rates are solved individually for all coefficients.
To comply to the interface, the flow rates are always computed for the first
(constant) coefficient. For higher orders, the function returns None if all
coefficients are zero.

The problem could definitely be solved more elegantly (and efficiently) by
using proper liner algebra.
"""
if len(inlet_flow_rates) == 0:
return None

coeffs = np.array(list(inlet_flow_rates.values()), ndmin=2)[:, coeff]
if coeff > 0 and not np.any(coeffs):
return None

# Setup lists for symbols
unit_total_flow_symbols = sym.symbols(
f'Q_total_0:{self.number_of_units}'
)
unit_inflow_symbols = []
unit_outflow_symbols = []

unit_total_flow_eq = []
unit_outflow_eq = []

# Setup symbolic equations
for unit_index, unit in enumerate(self.units):
if \
isinstance(unit, Inlet) or \
isinstance(unit, Cstr) and (
unit.flow_rate is not None
or
unit.name in inlet_flow_rates
):
unit_total_flow_eq.append(
sym.Add(
unit_total_flow_symbols[unit_index],
- float(inlet_flow_rates[unit.name][coeff])
)
)
else:
unit_i_inflow_symbols = []
if not isinstance(unit, Outlet):
unit_solution_dict['total_out'] = list(total_flow_rate_coefficents[:, index])

for origin in self.connections[unit].origins:
origin_index = self.get_unit_index(origin)
unit_i_inflow_symbols.append(
sym.symbols(f'Q_{origin_index}_{unit_index}')
)
if not isinstance(unit, Inlet):
unit_solution_dict['origins'] = Dict(
{
origin.name: list(
total_flow_rate_coefficents[:, self.get_unit_index(origin)]
* w_out_help[index, self.get_unit_index(origin)]
)
for origin in self.connections[unit].origins
}
)

symbols = (
*unit_i_inflow_symbols,
-unit_total_flow_symbols[unit_index]
if not isinstance(unit, Outlet):
unit_solution_dict['destinations'] = Dict(
{
destination.name: list(
total_flow_rate_coefficents[:, index]
* w_out_help[self.get_unit_index(destination), index]
)
for destination in self.connections[unit].destinations
}
)
unit_i_total_flow_eq = sym.Add(*symbols)

unit_inflow_symbols += unit_i_inflow_symbols
unit_total_flow_eq.append(unit_i_total_flow_eq)
return_flow_rates[unit.name] = unit_solution_dict

if not isinstance(unit, Outlet):
output_state = output_states[unit]
unit_i_outflow_symbols = []

for destination in self.connections[unit].destinations:
destination_index = self.get_unit_index(destination)
unit_i_outflow_symbols.append(
sym.symbols(f'Q_{unit_index}_{destination_index}')
)

unit_i_outflow_eq = [
sym.Add(
unit_i_outflow_symbols[dest],
-unit_total_flow_symbols[unit_index]*output_state[dest]
)
for dest in range(len(self.connections[unit].destinations))
]

unit_outflow_symbols += unit_i_outflow_symbols
unit_outflow_eq += unit_i_outflow_eq

# Solve system of equations
symbols = (
*unit_total_flow_symbols,
*unit_inflow_symbols,
*unit_outflow_symbols
)
symbols = set(symbols)

solution = sym.solve(unit_total_flow_eq + unit_outflow_eq, symbols)
solution = {str(key): value for key, value in solution.items()}

return solution
return return_flow_rates

def check_flow_rates(self, state=None):
flow_rates = self.get_flow_rates(state)
Expand All @@ -750,9 +706,7 @@ def check_flow_rates(self, state=None):
continue

if not np.all(q.total_in == q.total_out):
raise CADETProcessError(
f"Unbalanced flow rate for unit '{unit}'."
)
raise CADETProcessError(f"Unbalanced flow rate for unit '{unit}'.")

@property
def feed_inlets(self):
Expand Down
Loading
Loading