Skip to content

Commit

Permalink
feat(ensemble): add XGBoost
Browse files Browse the repository at this point in the history
  • Loading branch information
marcpinet committed Dec 3, 2024
1 parent 90a9194 commit 4280565
Showing 1 changed file with 212 additions and 0 deletions.
212 changes: 212 additions & 0 deletions neuralnetlib/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,3 +593,215 @@ def predict_proba(self, X):

proba = self._sigmoid(predictions)
return np.vstack([1 - proba, proba]).T



class XGBoostObjective(Enum):
REG_SQUAREDERROR = "reg:squarederror"
BINARY_LOGISTIC = "binary:logistic"

class XGBoostNode:
def __init__(self):
self.feature_idx: int = None
self.threshold: float = None
self.left: 'XGBoostNode' = None
self.right: 'XGBoostNode' = None
self.value: float = None
self.gain: float = 0.0
self.cover: float = 0.0

class XGBoostTree:
def __init__(self, max_depth: int = 6, min_child_weight: float = 1.0,
lambda_: float = 1.0, gamma: float = 0.0):
self.max_depth = max_depth
self.min_child_weight = min_child_weight
self.lambda_ = lambda_
self.gamma = gamma
self.root = None

def _calc_leaf_value(self, grad: np.ndarray, hess: np.ndarray) -> float:
return -np.sum(grad) / (np.sum(hess) + self.lambda_)

def _calc_gain(self, grad: np.ndarray, hess: np.ndarray) -> float:
G, H = np.sum(grad), np.sum(hess)
return (G * G) / (H + self.lambda_)

def _find_best_split(self, X: np.ndarray, grad: np.ndarray, hess: np.ndarray) -> tuple:
best_gain = 0.0
best_feature_idx = None
best_threshold = None
total_gain = self._calc_gain(grad, hess)
n_features = X.shape[1]

for feature_idx in range(n_features):
feature_values = X[:, feature_idx]
unique_values = np.unique(feature_values)

if len(unique_values) <= 1:
continue

thresholds = (unique_values[:-1] + unique_values[1:]) / 2

for threshold in thresholds:
left_mask = feature_values <= threshold
right_mask = ~left_mask

if (np.sum(hess[left_mask]) < self.min_child_weight or
np.sum(hess[right_mask]) < self.min_child_weight):
continue

left_gain = self._calc_gain(grad[left_mask], hess[left_mask])
right_gain = self._calc_gain(grad[right_mask], hess[right_mask])
gain = left_gain + right_gain - total_gain - self.gamma

if gain > best_gain:
best_gain = gain
best_feature_idx = feature_idx
best_threshold = threshold

return best_feature_idx, best_threshold, best_gain

def _build_tree(self, X: np.ndarray, grad: np.ndarray, hess: np.ndarray,
depth: int = 0) -> XGBoostNode:
node = XGBoostNode()
node.cover = np.sum(hess)

if depth >= self.max_depth:
node.value = self._calc_leaf_value(grad, hess)
return node

feature_idx, threshold, gain = self._find_best_split(X, grad, hess)

if feature_idx is None or gain <= 0:
node.value = self._calc_leaf_value(grad, hess)
return node

left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask

node.feature_idx = feature_idx
node.threshold = threshold
node.gain = gain
node.left = self._build_tree(X[left_mask], grad[left_mask],
hess[left_mask], depth + 1)
node.right = self._build_tree(X[right_mask], grad[right_mask],
hess[right_mask], depth + 1)

return node

def _predict_sample(self, x: np.ndarray, node: XGBoostNode) -> float:
if node.value is not None:
return node.value

if x[node.feature_idx] <= node.threshold:
return self._predict_sample(x, node.left)
return self._predict_sample(x, node.right)

def fit(self, X: np.ndarray, grad: np.ndarray, hess: np.ndarray) -> 'XGBoostTree':
self.root = self._build_tree(X, grad, hess)
return self

def predict(self, X: np.ndarray) -> np.ndarray:
return np.array([self._predict_sample(x, self.root) for x in X])

class XGBoost:
def __init__(self, objective: str = "reg:squarederror", n_estimators: int = 100,
learning_rate: float = 0.3, max_depth: int = 6,
min_child_weight: float = 1.0, subsample: float = 1.0,
colsample_bytree: float = 1.0, lambda_: float = 1.0,
gamma: float = 0.0, random_state: int = None):
self.objective = XGBoostObjective(objective)
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.max_depth = max_depth
self.min_child_weight = min_child_weight
self.subsample = subsample
self.colsample_bytree = colsample_bytree
self.lambda_ = lambda_
self.gamma = gamma
self.random_state = random_state
self.rng = np.random.default_rng(random_state)

self.trees = []
self.base_score = 0.5

def _sigmoid(self, x: np.ndarray) -> np.ndarray:
return 1 / (1 + np.exp(-x))

def _compute_gradients(self, y: np.ndarray, pred: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
if self.objective == XGBoostObjective.REG_SQUAREDERROR:
grad = pred - y
hess = np.ones_like(y)
else:
prob = self._sigmoid(pred)
grad = prob - y
hess = prob * (1 - prob)
return grad, hess

def _subsample_data(self, X: np.ndarray, y: np.ndarray,
grad: np.ndarray, hess: np.ndarray) -> tuple:
# Row subsampling
if self.subsample < 1.0:
n_samples = int(X.shape[0] * self.subsample)
indices = self.rng.choice(X.shape[0], size=n_samples, replace=False)
X = X[indices]
y = y[indices]
grad = grad[indices]
hess = hess[indices]

if self.colsample_bytree < 1.0:
n_features = int(X.shape[1] * self.colsample_bytree)
feature_indices = self.rng.choice(X.shape[1], size=n_features, replace=False)
X = X[:, feature_indices]

return X, y, grad, hess

def fit(self, X: np.ndarray, y: np.ndarray) -> 'XGBoost':
if self.objective == XGBoostObjective.BINARY_LOGISTIC:
y = (y > 0).astype(np.float64)
self.base_score = np.log(np.mean(y) / (1 - np.mean(y) + 1e-6))
else:
self.base_score = np.mean(y)

predictions = np.full(X.shape[0], self.base_score)
self.trees = []

for _ in range(self.n_estimators):
grad, hess = self._compute_gradients(y, predictions)

X_tree, y_tree, grad_tree, hess_tree = self._subsample_data(X, y, grad, hess)

tree = XGBoostTree(
max_depth=self.max_depth,
min_child_weight=self.min_child_weight,
lambda_=self.lambda_,
gamma=self.gamma
)
tree.fit(X_tree, grad_tree, hess_tree)
self.trees.append(tree)

predictions += self.learning_rate * tree.predict(X)

return self

def predict(self, X: np.ndarray) -> np.ndarray:
predictions = np.full(X.shape[0], self.base_score)

for tree in self.trees:
predictions += self.learning_rate * tree.predict(X)

if self.objective == XGBoostObjective.BINARY_LOGISTIC:
return (self._sigmoid(predictions) >= 0.5).astype(int)
return predictions

def predict_proba(self, X: np.ndarray) -> np.ndarray:
if self.objective != XGBoostObjective.BINARY_LOGISTIC:
raise ValueError("predict_proba is only available for binary classification")

predictions = np.full(X.shape[0], self.base_score)

for tree in self.trees:
predictions += self.learning_rate * tree.predict(X)

proba = self._sigmoid(predictions)
return np.vstack([1 - proba, proba]).T

0 comments on commit 4280565

Please sign in to comment.