From 4280565e0e3e897779a8e6337ec15695b4de24e1 Mon Sep 17 00:00:00 2001 From: GitHub Action <52708150+marcpinet@users.noreply.github.com> Date: Tue, 3 Dec 2024 19:25:03 +0100 Subject: [PATCH] feat(ensemble): add XGBoost --- neuralnetlib/ensemble.py | 212 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/neuralnetlib/ensemble.py b/neuralnetlib/ensemble.py index 99e0bf8..262b293 100644 --- a/neuralnetlib/ensemble.py +++ b/neuralnetlib/ensemble.py @@ -593,3 +593,215 @@ def predict_proba(self, X): proba = self._sigmoid(predictions) return np.vstack([1 - proba, proba]).T + + + +class XGBoostObjective(Enum): + REG_SQUAREDERROR = "reg:squarederror" + BINARY_LOGISTIC = "binary:logistic" + +class XGBoostNode: + def __init__(self): + self.feature_idx: int = None + self.threshold: float = None + self.left: 'XGBoostNode' = None + self.right: 'XGBoostNode' = None + self.value: float = None + self.gain: float = 0.0 + self.cover: float = 0.0 + +class XGBoostTree: + def __init__(self, max_depth: int = 6, min_child_weight: float = 1.0, + lambda_: float = 1.0, gamma: float = 0.0): + self.max_depth = max_depth + self.min_child_weight = min_child_weight + self.lambda_ = lambda_ + self.gamma = gamma + self.root = None + + def _calc_leaf_value(self, grad: np.ndarray, hess: np.ndarray) -> float: + return -np.sum(grad) / (np.sum(hess) + self.lambda_) + + def _calc_gain(self, grad: np.ndarray, hess: np.ndarray) -> float: + G, H = np.sum(grad), np.sum(hess) + return (G * G) / (H + self.lambda_) + + def _find_best_split(self, X: np.ndarray, grad: np.ndarray, hess: np.ndarray) -> tuple: + best_gain = 0.0 + best_feature_idx = None + best_threshold = None + total_gain = self._calc_gain(grad, hess) + n_features = X.shape[1] + + for feature_idx in range(n_features): + feature_values = X[:, feature_idx] + unique_values = np.unique(feature_values) + + if len(unique_values) <= 1: + continue + + thresholds = (unique_values[:-1] + unique_values[1:]) / 2 + + for threshold in thresholds: + left_mask = feature_values <= threshold + right_mask = ~left_mask + + if (np.sum(hess[left_mask]) < self.min_child_weight or + np.sum(hess[right_mask]) < self.min_child_weight): + continue + + left_gain = self._calc_gain(grad[left_mask], hess[left_mask]) + right_gain = self._calc_gain(grad[right_mask], hess[right_mask]) + gain = left_gain + right_gain - total_gain - self.gamma + + if gain > best_gain: + best_gain = gain + best_feature_idx = feature_idx + best_threshold = threshold + + return best_feature_idx, best_threshold, best_gain + + def _build_tree(self, X: np.ndarray, grad: np.ndarray, hess: np.ndarray, + depth: int = 0) -> XGBoostNode: + node = XGBoostNode() + node.cover = np.sum(hess) + + if depth >= self.max_depth: + node.value = self._calc_leaf_value(grad, hess) + return node + + feature_idx, threshold, gain = self._find_best_split(X, grad, hess) + + if feature_idx is None or gain <= 0: + node.value = self._calc_leaf_value(grad, hess) + return node + + left_mask = X[:, feature_idx] <= threshold + right_mask = ~left_mask + + node.feature_idx = feature_idx + node.threshold = threshold + node.gain = gain + node.left = self._build_tree(X[left_mask], grad[left_mask], + hess[left_mask], depth + 1) + node.right = self._build_tree(X[right_mask], grad[right_mask], + hess[right_mask], depth + 1) + + return node + + def _predict_sample(self, x: np.ndarray, node: XGBoostNode) -> float: + if node.value is not None: + return node.value + + if x[node.feature_idx] <= node.threshold: + return self._predict_sample(x, node.left) + return self._predict_sample(x, node.right) + + def fit(self, X: np.ndarray, grad: np.ndarray, hess: np.ndarray) -> 'XGBoostTree': + self.root = self._build_tree(X, grad, hess) + return self + + def predict(self, X: np.ndarray) -> np.ndarray: + return np.array([self._predict_sample(x, self.root) for x in X]) + +class XGBoost: + def __init__(self, objective: str = "reg:squarederror", n_estimators: int = 100, + learning_rate: float = 0.3, max_depth: int = 6, + min_child_weight: float = 1.0, subsample: float = 1.0, + colsample_bytree: float = 1.0, lambda_: float = 1.0, + gamma: float = 0.0, random_state: int = None): + self.objective = XGBoostObjective(objective) + self.n_estimators = n_estimators + self.learning_rate = learning_rate + self.max_depth = max_depth + self.min_child_weight = min_child_weight + self.subsample = subsample + self.colsample_bytree = colsample_bytree + self.lambda_ = lambda_ + self.gamma = gamma + self.random_state = random_state + self.rng = np.random.default_rng(random_state) + + self.trees = [] + self.base_score = 0.5 + + def _sigmoid(self, x: np.ndarray) -> np.ndarray: + return 1 / (1 + np.exp(-x)) + + def _compute_gradients(self, y: np.ndarray, pred: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + if self.objective == XGBoostObjective.REG_SQUAREDERROR: + grad = pred - y + hess = np.ones_like(y) + else: + prob = self._sigmoid(pred) + grad = prob - y + hess = prob * (1 - prob) + return grad, hess + + def _subsample_data(self, X: np.ndarray, y: np.ndarray, + grad: np.ndarray, hess: np.ndarray) -> tuple: + # Row subsampling + if self.subsample < 1.0: + n_samples = int(X.shape[0] * self.subsample) + indices = self.rng.choice(X.shape[0], size=n_samples, replace=False) + X = X[indices] + y = y[indices] + grad = grad[indices] + hess = hess[indices] + + if self.colsample_bytree < 1.0: + n_features = int(X.shape[1] * self.colsample_bytree) + feature_indices = self.rng.choice(X.shape[1], size=n_features, replace=False) + X = X[:, feature_indices] + + return X, y, grad, hess + + def fit(self, X: np.ndarray, y: np.ndarray) -> 'XGBoost': + if self.objective == XGBoostObjective.BINARY_LOGISTIC: + y = (y > 0).astype(np.float64) + self.base_score = np.log(np.mean(y) / (1 - np.mean(y) + 1e-6)) + else: + self.base_score = np.mean(y) + + predictions = np.full(X.shape[0], self.base_score) + self.trees = [] + + for _ in range(self.n_estimators): + grad, hess = self._compute_gradients(y, predictions) + + X_tree, y_tree, grad_tree, hess_tree = self._subsample_data(X, y, grad, hess) + + tree = XGBoostTree( + max_depth=self.max_depth, + min_child_weight=self.min_child_weight, + lambda_=self.lambda_, + gamma=self.gamma + ) + tree.fit(X_tree, grad_tree, hess_tree) + self.trees.append(tree) + + predictions += self.learning_rate * tree.predict(X) + + return self + + def predict(self, X: np.ndarray) -> np.ndarray: + predictions = np.full(X.shape[0], self.base_score) + + for tree in self.trees: + predictions += self.learning_rate * tree.predict(X) + + if self.objective == XGBoostObjective.BINARY_LOGISTIC: + return (self._sigmoid(predictions) >= 0.5).astype(int) + return predictions + + def predict_proba(self, X: np.ndarray) -> np.ndarray: + if self.objective != XGBoostObjective.BINARY_LOGISTIC: + raise ValueError("predict_proba is only available for binary classification") + + predictions = np.full(X.shape[0], self.base_score) + + for tree in self.trees: + predictions += self.learning_rate * tree.predict(X) + + proba = self._sigmoid(predictions) + return np.vstack([1 - proba, proba]).T