-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #17 from washingtonpost/ELEX-3031-voterflow-models 🎉
ELEX-3031: Potential Voterflow Solvers
- Loading branch information
Showing
8 changed files
with
598 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,90 @@ | ||
import logging | ||
import warnings | ||
|
||
import cvxpy as cp | ||
import numpy as np | ||
|
||
from elexsolver.logging import initialize_logging | ||
from elexsolver.TransitionSolver import TransitionSolver | ||
|
||
initialize_logging() | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
||
class TransitionMatrixSolver: | ||
def __init__(self): | ||
self.transition_matrix = None | ||
class TransitionMatrixSolver(TransitionSolver): | ||
""" | ||
Matrix regression transition solver using CVXPY. | ||
""" | ||
|
||
def __init__(self, strict: bool = True, lam: float | None = None): | ||
""" | ||
Parameters | ||
---------- | ||
strict : bool, default True | ||
If `True`, solution will be constrainted so that all coefficients are >= 0, | ||
<= 1, and the sum of each row equals 1. | ||
lam : float, optional | ||
`lam != 0` will enable L2 regularization (Ridge). | ||
""" | ||
super().__init__() | ||
self._strict = strict | ||
self._lambda = lam | ||
|
||
@staticmethod | ||
def __get_constraint(X, strict): | ||
def __get_constraints(coef: np.ndarray, strict: bool) -> list: | ||
if strict: | ||
return [cp.sum(X, axis=1) == 1] | ||
return [cp.sum(X, axis=1) <= 1.1, cp.sum(X, axis=1) >= 0.9] | ||
|
||
def __solve(self, A, B, strict): | ||
transition_matrix = cp.Variable((A.shape[1], B.shape[1])) | ||
loss_function = cp.norm(A @ transition_matrix - B, "fro") | ||
objective = cp.Minimize(loss_function) | ||
constraint = TransitionMatrixSolver.__get_constraint(transition_matrix, strict) | ||
problem = cp.Problem(objective, constraint) | ||
problem.solve() | ||
return [0 <= coef, coef <= 1, cp.sum(coef, axis=1) == 1] | ||
return [cp.sum(coef, axis=1) <= 1.1, cp.sum(coef, axis=1) >= 0.9] | ||
|
||
def __standard_objective(self, A: np.ndarray, B: np.ndarray, beta: np.ndarray) -> cp.Minimize: | ||
loss_function = cp.norm(A @ beta - B, "fro") | ||
return cp.Minimize(loss_function) | ||
|
||
def __ridge_objective(self, A: np.ndarray, B: np.ndarray, beta: np.ndarray) -> cp.Minimize: | ||
# Based on https://www.cvxpy.org/examples/machine_learning/ridge_regression.html | ||
lam = cp.Parameter(nonneg=True, value=self._lambda) | ||
loss_function = cp.pnorm(A @ beta - B, p=2) ** 2 | ||
regularizer = cp.pnorm(beta, p=2) ** 2 | ||
return cp.Minimize(loss_function + lam * regularizer) | ||
|
||
def __solve(self, A: np.ndarray, B: np.ndarray, weights: np.ndarray) -> np.ndarray: | ||
transition_matrix = cp.Variable((A.shape[1], B.shape[1]), pos=True) | ||
Aw = np.dot(weights, A) | ||
Bw = np.dot(weights, B) | ||
|
||
if self._lambda is None or self._lambda == 0: | ||
objective = self.__standard_objective(Aw, Bw, transition_matrix) | ||
else: | ||
objective = self.__ridge_objective(Aw, Bw, transition_matrix) | ||
|
||
constraints = TransitionMatrixSolver.__get_constraints(transition_matrix, self._strict) | ||
problem = cp.Problem(objective, constraints) | ||
|
||
with warnings.catch_warnings(): | ||
warnings.simplefilter("error") | ||
try: | ||
problem.solve(solver=cp.CLARABEL) | ||
except (UserWarning, cp.error.SolverError) as e: | ||
raise RuntimeError(e) from e | ||
|
||
return transition_matrix.value | ||
|
||
def fit(self, A, B, strict=False): | ||
transition_matrix = self.__solve(A, B, strict) | ||
self.transition_matrix = transition_matrix | ||
def fit(self, X: np.ndarray, Y: np.ndarray, sample_weight: np.ndarray | None = None) -> np.ndarray: | ||
self._check_any_element_nan_or_inf(X) | ||
self._check_any_element_nan_or_inf(Y) | ||
self._check_for_zero_units(X) | ||
self._check_for_zero_units(Y) | ||
|
||
if not isinstance(X, np.ndarray): | ||
X = X.to_numpy() | ||
if not isinstance(Y, np.ndarray): | ||
Y = Y.to_numpy() | ||
|
||
if X.shape[0] != Y.shape[0]: | ||
raise ValueError(f"Number of units in X ({X.shape[0]}) != number of units in Y ({Y.shape[0]}).") | ||
|
||
weights = self._check_and_prepare_weights(X, Y, sample_weight) | ||
|
||
def predict(self, A): | ||
return A @ self.transition_matrix | ||
self.coefficients = self.__solve(X, Y, weights) | ||
return self |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import logging | ||
|
||
import numpy as np | ||
|
||
from elexsolver.LinearSolver import LinearSolver | ||
from elexsolver.logging import initialize_logging | ||
|
||
initialize_logging() | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
||
class TransitionSolver(LinearSolver): | ||
""" | ||
Abstract class for transition solvers. | ||
""" | ||
|
||
def __init__(self): | ||
""" | ||
After model-fit, `self.coefficients` will contain | ||
the solved coefficients, an np.ndarray matrix of float of shape | ||
(number of columns in `X`) x (number of columns in `Y`). | ||
Each float represents the percent of how much of row x is part of column y. | ||
""" | ||
super().__init__() | ||
|
||
def fit(self, X: np.ndarray, Y: np.ndarray, sample_weight: np.ndarray | None = None): | ||
""" | ||
Parameters | ||
---------- | ||
X : np.ndarray matrix or pandas.DataFrame of int | ||
Must have the same number of rows as `Y` but can have any number of columns greater than the number of rows. | ||
Y : np.ndarray matrix or pandas.DataFrame of int | ||
Must have the same number of rows as `X` but can have any number of columns greater than the number of rows. | ||
sample_weight : list or np.ndarray or pandas.Series of int, optional | ||
Must have the same length (number of rows) as both `X` and `Y`. | ||
Returns | ||
------- | ||
`self` and populates `betas` with the beta coefficients determined by this solver. | ||
`betas` is an np.ndarray matrix of float of shape (number of columns in `X`) x (number of columns in `Y`). | ||
Each float represents the percent of how much of row x is part of column y. | ||
""" | ||
raise NotImplementedError | ||
|
||
def predict(self, X: np.ndarray) -> np.ndarray: | ||
""" | ||
Parameters | ||
---------- | ||
X : np.ndarray matrix or pandas.DataFrame of int | ||
Must have the same dimensions as the `X` supplied to `fit()`. | ||
Returns | ||
------- | ||
`Y_hat`, np.ndarray of float of the same shape as Y. | ||
""" | ||
if self.coefficients is None: | ||
raise RuntimeError("Solver must be fit before prediction can be performed.") | ||
|
||
self._check_any_element_nan_or_inf(X) | ||
|
||
return X @ self.coefficients | ||
|
||
def _check_for_zero_units(self, A: np.ndarray): | ||
""" | ||
If we have at least one unit whose columns are all zero, most if not all of our solvers will fail. | ||
""" | ||
if np.any(np.sum(A, axis=1) == 0): | ||
raise ValueError("Matrix cannot contain any rows (units) where all columns (things) are zero.") | ||
|
||
def _check_and_prepare_weights(self, X: np.ndarray, Y: np.ndarray, weights: np.ndarray | None) -> np.ndarray: | ||
""" | ||
If `weights` is not None, and `weights` has the same number of rows in both matrices `X` and `Y`, | ||
we'll rescale the weights by taking the square root after dividing them by their sum, | ||
then return a diagonal matrix containing these now-normalized weights. | ||
If `weights` is None, return a diagonal matrix of ones. | ||
Parameters | ||
---------- | ||
X : np.ndarray matrix of int (same number of rows as `Y`) | ||
Y : np.ndarray matrix of int (same number of rows as `X`) | ||
weights : np.ndarray of int of the shape (number of rows in `X` and `Y`, 1), optional | ||
""" | ||
|
||
if weights is not None: | ||
if len(weights) != X.shape[0] and len(weights) != Y.shape[0]: | ||
raise ValueError("weights must be the same length as the number of rows in X and Y.") | ||
if isinstance(weights, list): | ||
weights = np.array(weights).copy() | ||
elif not isinstance(weights, np.ndarray): | ||
# pandas.Series | ||
weights = weights.values.copy() | ||
return np.diag(np.sqrt(weights.flatten() / weights.sum())) | ||
|
||
return np.diag(np.ones((Y.shape[0],))) |
Oops, something went wrong.