diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..0106924b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,50 @@ +# Ignore Git repository files +.git +.gitignore + +# Ignore Python virtual environments +.venv/ +env/ +venv/ + +# Ignore log files +*.log + +# Ignore Python cache and compiled files +__pycache__/ +*.py[cod] +*$py.class + +# Ignore pytest cache +.pytest_cache/ + +# Ignore mypy cache +.mypy_cache/ + +# Ignore coverage reports +.coverage +htmlcov/ + +# Ignore temporary files +*.swp +*~ +*.tmp +*.temp + +# Ignore build artifacts +build/ +dist/ +.eggs/ +*.egg-info/ + +# Ignore IDE/editor configurations +.vscode/ +.idea/ + +# Ignore macOS Finder files +.DS_Store + +# Ignore other system files +Thumbs.db +desktop.ini + diff --git a/.gitignore b/.gitignore index d361c894..a018b1a8 100644 --- a/.gitignore +++ b/.gitignore @@ -46,7 +46,6 @@ coverage.xml .ropeproject # Django stuff: -*.log *.pot # Sphinx documentation diff --git a/Dockerfile b/Dockerfile index c3b71ba1..370f7777 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,28 +1,25 @@ -# Use the official Ubuntu 24.04 LTS as the base image -FROM ubuntu:24.04 +# --------------------- Stage 1: Build the wheel --------------------- +# Use the official Ubuntu 23.04 as the base image for the builder +FROM ubuntu:23.04 AS builder # Set environment variables ENV PYTHONUNBUFFERED=1 \ POETRY_VIRTUALENVS_CREATE=false \ POETRY_NO_INTERACTION=1 -# Update and install system dependencies +# Update and install system dependencies for building RUN apt-get update && \ apt-get install -y --no-install-recommends \ build-essential \ curl \ - wget \ - git \ - python3.12 \ - python3.12-venv \ - python3.12-dev \ - samtools \ - bedtools \ + python3.11 \ + python3.11-venv \ + python3.11-dev \ && rm -rf /var/lib/apt/lists/* # Ensure python3 and pip3 are the default -RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.12 1 && \ - update-alternatives --install /usr/bin/pip3 pip3 /usr/bin/pip3.12 1 +RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1 +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3.11 10 # Install Poetry RUN curl -sSL https://install.python-poetry.org | python3 - @@ -36,11 +33,14 @@ WORKDIR /app # Copy pyproject.toml and poetry.lock if available COPY pyproject.toml poetry.lock* /app/ -# Install project dependencies -RUN poetry install --no-root --only main +# Install project dependencies without installing the package itself +RUN poetry install --only main # Copy the rest of the project files COPY . /app -# Install the project -RUN poetry install --no-dev \ No newline at end of file +# Install the package +RUN poetry install + +# Set the entrypoint to ldsc +ENTRYPOINT ["poetry", "run"] \ No newline at end of file diff --git a/README.md b/README.md index 63918c12..7871eb85 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,8 @@ Key publications: Run the help command to verify that LDSC is installed correctly: ```bash - python ldsc.py -h - python munge_sumstats.py -h + ldsc -h + munge_sumstats -h ``` If these commands display help messages with available options, the installation was successful. diff --git a/ldsc.py b/ldsc.py index 6801dbde..97ea1c90 100755 --- a/ldsc.py +++ b/ldsc.py @@ -303,7 +303,6 @@ def ldscore(args, log): array_snps, keep_snps=keep_snps, keep_indivs=keep_indivs, - mafMin=args.maf, ) # filter annot_matrix down to only SNPs passing MAF cutoffs @@ -780,8 +779,8 @@ def ldscore(args, log): help="Population prevalence of binary phenotype (for conversion to liability scale).", ) -if __name__ == "__main__": +def main(): args = parser.parse_args() if args.out is None: raise ValueError("--out is required.") @@ -861,3 +860,7 @@ def ldscore(args, log): log.log("Analysis finished at {T}".format(T=time.ctime())) time_elapsed = round(time.time() - start_time, 2) log.log("Total time elapsed: {T}".format(T=sec_to_str(time_elapsed))) + + +if __name__ == "__main__": + main() diff --git a/ldscore/irwls.py b/ldscore/irwls.py index a33a7c3d..c8078c6b 100644 --- a/ldscore/irwls.py +++ b/ldscore/irwls.py @@ -1,187 +1,234 @@ """ -(c) 2015 Brendan Bulik-Sullivan and Hilary Finucane +Iteratively Re-weighted Least Squares (IRWLS) module. -Iterativey re-weighted least squares. +This module provides the IRWLS class, which implements iteratively re-weighted +least squares for regression analysis, including methods for jackknife variance +estimation. +(c) 2015 Brendan Bulik-Sullivan and Hilary Finucane +(c) 2024 Thomas Reimonn """ +from typing import Callable, Optional, Union + import numpy as np from . import jackknife as jk -class IRWLS(object): +class IRWLS: """ - Iteratively re-weighted least squares (FLWS). - - Parameters - ---------- - x : np.matrix with shape (n, p) - Independent variable. - y : np.matrix with shape (n, 1) - Dependent variable. - update_func : function - Transforms output of np.linalg.lstsq to new weights. - n_blocks : int - Number of jackknife blocks (for estimating SE via block jackknife). - w : np.matrix with shape (n, 1) - Initial regression weights (default is the identity matrix). These should be on the - inverse CVF scale. - slow : bool - Use slow block jackknife? (Mostly for testing) - - Attributes - ---------- - est : np.matrix with shape (1, p) - IRWLS estimate. - jknife_est : np.matrix with shape (1, p) - Jackknifed estimate. - jknife_var : np.matrix with shape (1, p) - Variance of jackknifed estimate. - jknife_se : np.matrix with shape (1, p) - Standard error of jackknifed estimate, equal to sqrt(jknife_var). - jknife_cov : np.matrix with shape (p, p) - Covariance matrix of jackknifed estimate. - delete_values : np.matrix with shape (n_blocks, p) - Jackknife delete values. - - Methods - ------- - wls(x, y, w) : - Weighted Least Squares. - _weight(x, w) : - Weight x by w. + Iteratively Re-weighted Least Squares (IRWLS) estimator. + + This class implements the IRWLS algorithm for estimating regression coefficients, + allowing for heteroscedasticity or other forms of non-constant variance in the + residuals. It also provides jackknife variance estimation using block jackknife. + + Attributes: + est (np.ndarray): Estimated regression coefficients (shape: (n_features, 1)). + jknife_est (np.ndarray): Jackknife estimates of the regression coefficients + (shape: (n_features, 1)). + jknife_var (np.ndarray): Variance of the jackknife estimates (shape: (n_features, 1)). + jknife_se (np.ndarray): Standard errors of the jackknife estimates + (shape: (n_features, 1)). + jknife_cov (np.ndarray): Covariance matrix of the jackknife estimates + (shape: (n_features, n_features)). + delete_values (np.ndarray): Jackknife delete values (shape: (n_blocks, n_features)). + separators (Optional[np.ndarray]): Block boundaries for jackknife + (shape: (n_blocks + 1,)). """ - def __init__(self, x, y, update_func, n_blocks, w=None, slow=False, separators=None): - n, p = jk._check_shape(x, y) + def __init__( + self, + X: np.ndarray, + y: np.ndarray, + update_func: Callable[[tuple], np.ndarray], + n_blocks: int, + w: Optional[np.ndarray] = None, + slow: bool = False, + separators: Optional[np.ndarray] = None, + max_iter: int = 2, + ) -> None: + """ + Initialize the IRWLS estimator. + + Args: + X (np.ndarray): Independent variables (shape: (n_samples, n_features)). + y (np.ndarray): Dependent variable (shape: (n_samples,) or (n_samples, 1)). + update_func (Callable[[tuple], np.ndarray]): Function to update weights. + Should take the output of np.linalg.lstsq and return new weights + (shape: (n_samples, 1)). + n_blocks (int): Number of jackknife blocks for variance estimation. + w (Optional[np.ndarray]): Initial regression weights (shape: (n_samples,) or + (n_samples, 1)). Defaults to ones if None. + slow (bool): Whether to use the slow block jackknife method (for testing). + separators (Optional[np.ndarray]): Optional block boundaries for jackknife. + max_iter (int): Maximum number of iterations for the IRWLS algorithm. + + Raises: + ValueError: If input arrays have incompatible shapes. + """ + n_samples, _ = X.shape + y = y.reshape(-1, 1) if w is None: - w = np.ones_like(y) - if w.shape != (n, 1): - raise ValueError("w has shape {S}. w must have shape ({N}, 1).".format(S=w.shape, N=n)) - - jknife = self.irwls(x, y, update_func, n_blocks, w, slow=slow, separators=separators) + w = np.ones((n_samples, 1)) + else: + w = w.reshape(-1, 1) + + if w.shape != (n_samples, 1): + raise ValueError(f"w has shape {w.shape}. Expected shape: ({n_samples}, 1).") + + jknife = self.irwls( + X, + y, + update_func, + n_blocks, + w, + slow=slow, + separators=separators, + max_iter=max_iter, + ) self.est = jknife.est - self.jknife_se = jknife.jknife_se self.jknife_est = jknife.jknife_est self.jknife_var = jknife.jknife_var + self.jknife_se = jknife.jknife_se self.jknife_cov = jknife.jknife_cov self.delete_values = jknife.delete_values self.separators = jknife.separators @classmethod - def irwls(cls, x, y, update_func, n_blocks, w, slow=False, separators=None): + def irwls( + cls, + X: np.ndarray, + y: np.ndarray, + update_func: Callable[[tuple], np.ndarray], + n_blocks: int, + w: np.ndarray, + slow: bool = False, + separators: Optional[np.ndarray] = None, + max_iter: int = 2, + ) -> Union[jk.LstsqJackknifeFast, jk.LstsqJackknifeSlow]: """ - Iteratively re-weighted least squares (IRWLS). - - Parameters - ---------- - x : np.matrix with shape (n, p) - Independent variable. - y : np.matrix with shape (n, 1) - Dependent variable. - update_func: function - Transforms output of np.linalg.lstsq to new weights. - n_blocks : int - Number of jackknife blocks (for estimating SE via block jackknife). - w : np.matrix with shape (n, 1) - Initial regression weights. - slow : bool - Use slow block jackknife? (Mostly for testing) - separators : list or None - Block jackknife block boundaries (optional). - - Returns - ------- - jknife : jk.LstsqJackknifeFast - Block jackknife regression with the final IRWLS weights. - + Perform Iteratively Re-weighted Least Squares (IRWLS). + + Args: + X (np.ndarray): Independent variables (shape: (n_samples, n_features)). + y (np.ndarray): Dependent variable (shape: (n_samples, 1)). + update_func (Callable[[tuple], np.ndarray]): Function to update weights. + n_blocks (int): Number of jackknife blocks. + w (np.ndarray): Initial regression weights (shape: (n_samples, 1)). + slow (bool): Whether to use the slow block jackknife method. + separators (Optional[np.ndarray]): Optional block boundaries. + max_iter (int): Maximum number of iterations for the IRWLS algorithm. + + Returns: + Union[jk.LstsqJackknifeFast, jk.LstsqJackknifeSlow]: Jackknife regression object + with final IRWLS weights. + + Raises: + ValueError: If input arrays have incompatible shapes or weights are invalid. """ - (n, p) = x.shape - if y.shape != (n, 1): - raise ValueError("y has shape {S}. y must have shape ({N}, 1).".format(S=y.shape, N=n)) - if w.shape != (n, 1): - raise ValueError("w has shape {S}. w must have shape ({N}, 1).".format(S=w.shape, N=n)) - - w = np.sqrt(w) - for i in range(2): # update this later - new_w = np.sqrt(update_func(cls.wls(x, y, w))) - if new_w.shape != w.shape: - print("IRWLS update:", new_w.shape, w.shape) - raise ValueError("New weights must have same shape.") - else: - w = new_w - - x = cls._weight(x, w) - y = cls._weight(y, w) + n_samples, _ = X.shape + y = y.reshape(-1, 1) + w = w.reshape(-1, 1) + + if y.shape != (n_samples, 1): + raise ValueError(f"y has shape {y.shape}. Expected shape: ({n_samples}, 1).") + if w.shape != (n_samples, 1): + raise ValueError(f"w has shape {w.shape}. Expected shape: ({n_samples}, 1).") + + # Initialize weights + w_sqrt = np.sqrt(w) + + # Iteratively update weights + for iteration in range(max_iter): + coef = cls.wls(X, y, w_sqrt) + new_w = np.sqrt(update_func(coef)) + if new_w.shape != w_sqrt.shape: + raise ValueError(f"New weights have shape {new_w.shape}, expected {w_sqrt.shape}.") + w_sqrt = new_w + + # Weight the data + X_weighted = cls._weight(X, w_sqrt) + y_weighted = cls._weight(y, w_sqrt) + + # Perform jackknife estimation if slow: - jknife = jk.LstsqJackknifeSlow(x, y, n_blocks, separators=separators) + jknife = jk.LstsqJackknifeSlow(X_weighted, y_weighted, n_blocks, separators=separators) else: - jknife = jk.LstsqJackknifeFast(x, y, n_blocks, separators=separators) + jknife = jk.LstsqJackknifeFast(X_weighted, y_weighted, n_blocks, separators=separators) return jknife @classmethod - def wls(cls, x, y, w): + def wls( + cls, + X: np.ndarray, + y: np.ndarray, + w_sqrt: np.ndarray, + ) -> tuple: """ - Weighted least squares. - - Parameters - ---------- - x : np.matrix with shape (n, p) - Independent variable. - y : np.matrix with shape (n, 1) - Dependent variable. - w : np.matrix with shape (n, 1) - Regression weights (1/CVF scale). - - Returns - ------- - coef : list with four elements (coefficients, residuals, rank, singular values) - Output of np.linalg.lstsq + Perform Weighted Least Squares regression. + + Args: + X (np.ndarray): Independent variables (shape: (n_samples, n_features)). + y (np.ndarray): Dependent variable (shape: (n_samples, 1)). + w_sqrt (np.ndarray): Square root of weights (shape: (n_samples, 1)). + Returns: + tuple: Output of np.linalg.lstsq (coefficients, residuals, rank, singular values). + + Raises: + ValueError: If input arrays have incompatible shapes. """ - (n, p) = x.shape - if y.shape != (n, 1): - raise ValueError("y has shape {S}. y must have shape ({N}, 1).".format(S=y.shape, N=n)) - if w.shape != (n, 1): - raise ValueError("w has shape {S}. w must have shape ({N}, 1).".format(S=w.shape, N=n)) - - x = cls._weight(x, w) - y = cls._weight(y, w) - coef = np.linalg.lstsq(x, y) + n_samples, _ = X.shape + y = y.reshape(-1, 1) + w_sqrt = w_sqrt.reshape(-1, 1) + + if y.shape != (n_samples, 1): + raise ValueError(f"y has shape {y.shape}. Expected shape: ({n_samples}, 1).") + if w_sqrt.shape != (n_samples, 1): + raise ValueError(f"w_sqrt has shape {w_sqrt.shape}. Expected shape: ({n_samples}, 1).") + + # Weight the data + X_weighted = cls._weight(X, w_sqrt) + y_weighted = cls._weight(y, w_sqrt) + + # Perform least squares regression + coef = np.linalg.lstsq(X_weighted, y_weighted, rcond=None) + return coef - @classmethod - def _weight(cls, x, w): + @staticmethod + def _weight( + X: np.ndarray, + w_sqrt: np.ndarray, + ) -> np.ndarray: """ - Weight x by w. + Weight the data matrix X by w_sqrt. - Parameters - ---------- - x : np.matrix with shape (n, p) - Rows are observations. - w : np.matrix with shape (n, 1) - Regression weights (1 / sqrt(CVF) scale). + Args: + X (np.ndarray): Data matrix (shape: (n_samples, n_features) or (n_samples, 1)). + w_sqrt (np.ndarray): Square root of weights (shape: (n_samples, 1)). - Returns - ------- - x_new : np.matrix with shape (n, p) - x_new[i,j] = x[i,j] * w'[i], where w' is w normalized to have sum 1. - - Raises - ------ - ValueError : - If any element of w is <= 0 (negative weights are not meaningful in WLS). + Returns: + np.ndarray: Weighted data matrix (shape: (n_samples, n_features) or (n_samples, 1)). + Raises: + ValueError: If weights contain non-positive values or shapes are incompatible. """ - if np.any(w <= 0): - raise ValueError("Weights must be > 0") - (n, p) = x.shape - if w.shape != (n, 1): - raise ValueError("w has shape {S}. w must have shape (n, 1).".format(S=w.shape)) - - w = w / float(np.sum(w)) - x_new = np.multiply(x, w) - return x_new + if np.any(w_sqrt <= 0): + raise ValueError("Weights must be positive.") + + n_samples = X.shape[0] + if w_sqrt.shape != (n_samples, 1): + raise ValueError(f"w_sqrt has shape {w_sqrt.shape}. Expected shape: ({n_samples}, 1).") + + # Normalize weights to have sum 1 + w_normalized = w_sqrt / np.sum(w_sqrt) + + # Multiply each row of X by the corresponding weight + X_weighted = X * w_normalized + + return X_weighted diff --git a/ldscore/ldscore.py b/ldscore/ldscore.py index bdbafc49..f7c51e1b 100644 --- a/ldscore/ldscore.py +++ b/ldscore/ldscore.py @@ -1,423 +1,604 @@ +""" +LD Score Calculation Module. + +This module provides classes and functions for calculating linkage disequilibrium (LD) scores, +which are useful in genetic studies for understanding the correlation structure of genetic variants. + +Classes: + GenotypeArrayInMemory: Base class for genotype data handling in memory. + PlinkBEDFile: Class for handling PLINK .bed genotype files. + +Functions: + get_block_lefts(coords, max_dist): Compute indices of leftmost SNPs within a specified distance. + block_left_to_right(block_left): Convert block left indices to block right indices. + +(c) 2015 Brendan Bulik-Sullivan and Hilary Finucane +(c) 2024 Thomas Reimonn +""" + +from typing import Callable, Optional, Tuple + import bitarray as ba import numpy as np -def getBlockLefts(coords, max_dist): +def get_block_lefts(coords: np.ndarray, max_dist: float) -> np.ndarray: """ - Converts coordinates + max block length to the a list of coordinates of the leftmost - SNPs to be included in blocks. + Compute indices of the leftmost SNPs within a specified maximum distance. - Parameters - ---------- - coords : array - Array of coordinates. Must be sorted. - max_dist : float - Maximum distance between SNPs included in the same window. + Args: + coords (np.ndarray): Array of genomic coordinates (must be sorted). + max_dist (float): Maximum distance between SNPs to be included in the same window. - Returns - ------- - block_left : 1D np.ndarray with same length as block_left - block_left[j] := min{k | dist(j, k) < max_dist}. + Returns: + np.ndarray: Array where each element is the index of the leftmost SNP included + in the LD score calculation for the corresponding SNP. + Raises: + ValueError: If coords is not a one-dimensional array. """ + if coords.ndim != 1: + raise ValueError("coords must be a one-dimensional array.") M = len(coords) + block_left = np.zeros(M, dtype=int) j = 0 - block_left = np.zeros(M) for i in range(M): while j < M and abs(coords[j] - coords[i]) > max_dist: j += 1 - block_left[i] = j - return block_left -def block_left_to_right(block_left): +def block_left_to_right(block_left: np.ndarray) -> np.ndarray: """ - Converts block lefts to block rights. - - Parameters - ---------- - block_left : array - Array of block lefts. + Convert block left indices to block right indices. - Returns - ------- - block_right : 1D np.ndarray with same length as block_left - block_right[j] := max {k | block_left[k] <= j} + Args: + block_left (np.ndarray): Array of block left indices. + Returns: + np.ndarray: Array where each element is the index of the rightmost SNP included + in the LD score calculation for the corresponding SNP. """ M = len(block_left) + block_right = np.zeros(M, dtype=int) j = 0 - block_right = np.zeros(M) for i in range(M): while j < M and block_left[j] <= i: j += 1 - block_right[i] = j - return block_right -class __GenotypeArrayInMemory__(object): +class GenotypeArrayInMemory: """ - Parent class for various classes containing interfaces for files with genotype - matrices, e.g., plink .bed files, etc + Base class for genotype data handling in memory. + + This class provides methods to read genotype data, filter SNPs and individuals, + and compute LD scores. + + Attributes: + m (int): Number of SNPs. + n (int): Number of individuals. + df (np.ndarray): SNP metadata array (e.g., chromosome, SNP ID, base pair position). + colnames (list): Column names for the SNP metadata. + maf_min (float): Minimum minor allele frequency for filtering. + geno (bitarray.bitarray): Bitarray representing genotype data. + kept_snps (list): Indices of SNPs kept after filtering. + freq (np.ndarray): Allele frequencies of the kept SNPs. + maf (np.ndarray): Minor allele frequencies of the kept SNPs. + sqrtpq (np.ndarray): Square root of p * q for each SNP. """ - def __init__(self, fname, n, snp_list, keep_snps=None, keep_indivs=None, mafMin=None): + def __init__( + self, + fname: str, + n: int, + snp_list, + keep_snps: Optional[np.ndarray] = None, + keep_indivs: Optional[np.ndarray] = None, + maf_min: Optional[float] = None, + ) -> None: + """ + Initialize the GenotypeArrayInMemory object. + + Args: + fname (str): Filename of the genotype data. + n (int): Number of individuals. + snp_list: SNP list object containing SNP metadata. + keep_snps (Optional[np.ndarray]): Indices of SNPs to keep. + keep_indivs (Optional[np.ndarray]): Indices of individuals to keep. + maf_min (Optional[float]): Minimum minor allele frequency for filtering. + + Raises: + ValueError: If filtering results in zero individuals or SNPs remaining. + """ self.m = len(snp_list.IDList) self.n = n self.keep_snps = keep_snps self.keep_indivs = keep_indivs self.df = np.array(snp_list.df[["CHR", "SNP", "BP", "CM"]]) self.colnames = ["CHR", "SNP", "BP", "CM"] - self.mafMin = mafMin if mafMin is not None else 0 - self._currentSNP = 0 - (self.nru, self.geno) = self.__read__(fname, self.m, n) - # filter individuals - if keep_indivs is not None: - keep_indivs = np.array(keep_indivs, dtype="int") - if np.any(keep_indivs > self.n): - raise ValueError("keep_indivs indices out of bounds") - - (self.geno, self.m, self.n) = self.__filter_indivs__(self.geno, keep_indivs, self.m, self.n) - - if self.n > 0: - print("After filtering, {n} individuals remain".format(n=self.n)) - else: - raise ValueError("After filtering, no individuals remain") + self.maf_min = maf_min if maf_min is not None else 0.0 + self._current_snp = 0 - # filter SNPs - if keep_snps is not None: - keep_snps = np.array(keep_snps, dtype="int") - if np.any(keep_snps > self.m): # if keep_snps is None, this returns False - raise ValueError("keep_snps indices out of bounds") + self.nru, self.geno = self._read(fname, self.m, n) - (self.geno, self.m, self.n, self.kept_snps, self.freq) = self.__filter_snps_maf__( - self.geno, self.m, self.n, self.mafMin, keep_snps - ) + # Filter individuals + if self.keep_indivs is not None: + self.geno, self.m, self.n = self._filter_indivs(self.geno, self.keep_indivs, self.m, self.n) + if self.n == 0: + raise ValueError("After filtering, no individuals remain.") + else: + print(f"After filtering, {self.n} individuals remain.") - if self.m > 0: - print("After filtering, {m} SNPs remain".format(m=self.m)) + # Filter SNPs + self.geno, self.m, self.n, self.kept_snps, self.freq = self._filter_snps_maf( + self.geno, self.m, self.n, self.maf_min, self.keep_snps + ) + if self.m == 0: + raise ValueError("After filtering, no SNPs remain.") else: - raise ValueError("After filtering, no SNPs remain") + print(f"After filtering, {self.m} SNPs remain.") self.df = self.df[self.kept_snps, :] - self.maf = np.minimum(self.freq, np.ones(self.m) - self.freq) - self.sqrtpq = np.sqrt(self.freq * (np.ones(self.m) - self.freq)) - self.df = np.c_[self.df, self.maf] + self.maf = np.minimum(self.freq, 1.0 - self.freq) + self.sqrtpq = np.sqrt(self.freq * (1.0 - self.freq)) + self.df = np.column_stack((self.df, self.maf)) self.colnames.append("MAF") - def __read__(self, fname, m, n): - raise NotImplementedError + def _read(self, fname: str, m: int, n: int) -> Tuple[int, ba.bitarray]: + """ + Read genotype data from a file. + + Args: + fname (str): Filename of the genotype data. + m (int): Number of SNPs. + n (int): Number of individuals. + + Returns: + Tuple[int, ba.bitarray]: Tuple containing the number of units (nru) and + the genotype bitarray. + + Raises: + NotImplementedError: Must be implemented in subclasses. + """ + raise NotImplementedError("Subclasses must implement the _read method.") + + def _filter_indivs( + self, geno: ba.bitarray, keep_indivs: np.ndarray, m: int, n: int + ) -> Tuple[ba.bitarray, int, int]: + """ + Filter individuals from the genotype data. + + Args: + geno (ba.bitarray): Genotype bitarray. + keep_indivs (np.ndarray): Indices of individuals to keep. + m (int): Number of SNPs. + n (int): Number of individuals. + + Returns: + Tuple[ba.bitarray, int, int]: Tuple containing the filtered genotype bitarray, + number of SNPs, and new number of individuals. + + Raises: + NotImplementedError: Must be implemented in subclasses. + """ + raise NotImplementedError("Subclasses must implement the _filter_indivs method.") + + def _filter_snps_maf( + self, + geno: ba.bitarray, + m: int, + n: int, + maf_min: float, + keep_snps: Optional[np.ndarray], + ) -> Tuple[ba.bitarray, int, int, list, np.ndarray]: + """ + Filter SNPs based on minor allele frequency (MAF) and SNP indices. + + Args: + geno (ba.bitarray): Genotype bitarray. + m (int): Number of SNPs. + n (int): Number of individuals. + maf_min (float): Minimum minor allele frequency. + keep_snps (Optional[np.ndarray]): Indices of SNPs to keep. + + Returns: + Tuple containing: + - ba.bitarray: Filtered genotype bitarray. + - int: Number of polymorphic SNPs. + - int: Number of individuals. + - list: Indices of kept SNPs. + - np.ndarray: Allele frequencies of kept SNPs. + """ + raise NotImplementedError("Subclasses must implement the _filter_snps_maf method.") + + def ld_score_var_blocks(self, block_left: np.ndarray, c: int, annot: Optional[np.ndarray] = None) -> np.ndarray: + """ + Compute an unbiased estimate of LD scores using variable block sizes. + + Args: + block_left (np.ndarray): Array of block left indices. + c (int): Chunk size. + annot (Optional[np.ndarray]): SNP annotations (shape: (m, n_a)). - def __filter_indivs__(geno, keep_indivs, m, n): - raise NotImplementedError + Returns: + np.ndarray: LD scores (shape: (m, n_a)). + """ + func = lambda x: self._l2_unbiased(x, self.n) + snp_getter = self.next_snps + return self._cor_sum_var_blocks(block_left, c, func, snp_getter, annot) - def __filter_maf_(geno, m, n, maf): - raise NotImplementedError + def ld_score_block_jackknife( + self, block_left: np.ndarray, c: int, annot: Optional[np.ndarray] = None, jn: int = 10 + ) -> np.ndarray: + """ + Compute LD scores using block jackknife. - def ldScoreVarBlocks(self, block_left, c, annot=None): - """Computes an unbiased estimate of L2(j) for j=1,..,M.""" - func = lambda x: self.__l2_unbiased__(x, self.n) - snp_getter = self.nextSNPs - return self.__corSumVarBlocks__(block_left, c, func, snp_getter, annot) + Args: + block_left (np.ndarray): Array of block left indices. + c (int): Chunk size. + annot (Optional[np.ndarray]): SNP annotations. + jn (int): Number of jackknife blocks. - def ldScoreBlockJackknife(self, block_left, c, annot=None, jN=10): + Returns: + np.ndarray: LD scores with jackknife variance estimates. + """ func = lambda x: np.square(x) - snp_getter = self.nextSNPs - return self.__corSumBlockJackknife__(block_left, c, func, snp_getter, annot, jN) + snp_getter = self.next_snps + return self._cor_sum_block_jackknife(block_left, c, func, snp_getter, annot, jn) - def __l2_unbiased__(self, x, n): - denom = n - 2 if n > 2 else n # allow n<2 for testing purposes + @staticmethod + def _l2_unbiased(x: np.ndarray, n: int) -> np.ndarray: + """ + Compute an unbiased estimate of squared correlation coefficients. + + Args: + x (np.ndarray): Correlation coefficients. + n (int): Number of individuals. + + Returns: + np.ndarray: Unbiased estimate of squared correlation coefficients. + + Notes: + The unbiased estimator is calculated as: + l2_unbiased = x^2 - (1 - x^2) / (n - 2) + """ + denom = n - 2 if n > 2 else n # Allow n < 2 for testing purposes sq = np.square(x) return sq - (1 - sq) / denom - # general methods for calculating sums of Pearson correlation coefficients - def __corSumVarBlocks__(self, block_left, c, func, snp_getter, annot=None): + def _cor_sum_var_blocks( + self, + block_left: np.ndarray, + c: int, + func: Callable[[np.ndarray], np.ndarray], + snp_getter: Callable[[int], np.ndarray], + annot: Optional[np.ndarray] = None, + ) -> np.ndarray: """ - Parameters - ---------- - block_left : np.ndarray with shape (M, ) - block_left[i] = index of leftmost SNP included in LD Score of SNP i. - if c > 1, then only entries that are multiples of c are examined, and it is - assumed that block_left[a*c+i] = block_left[a*c], except at - the beginning of the chromosome where the 0th SNP is included in the window. - - c : int - Chunk size. - func : function - Function to be applied to the genotype correlation matrix. Before dotting with - annot. Examples: for biased L2, np.square. For biased L4, - lambda x: np.square(np.square(x)). For L1, lambda x: x. - snp_getter : function(int) - The method to be used to get the next SNPs (normalized genotypes? Normalized - genotypes with the minor allele as reference allele? etc) - annot: numpy array with shape (m,n_a) - SNP annotations. - - Returns - ------- - cor_sum : np.ndarray with shape (M, num_annots) - Estimates. + General method for calculating sums of transformed Pearson correlation coefficients. + + Args: + block_left (np.ndarray): Array of block left indices. + c (int): Chunk size. + func (Callable[[np.ndarray], np.ndarray]): Function to apply to the correlation matrix. + snp_getter (Callable[[int], np.ndarray]): Function to retrieve SNPs. + annot (Optional[np.ndarray]): SNP annotations (shape: (m, n_a)). + Returns: + np.ndarray: Summed values after applying the function and weighting by annotations. """ m, n = self.m, self.n - block_sizes = np.array(np.arange(m) - block_left) - block_sizes = np.ceil(block_sizes / c) * c if annot is None: annot = np.ones((m, 1)) else: - annot_m = annot.shape[0] - if annot_m != self.m: - raise ValueError("Incorrect number of SNPs in annot") + if annot.shape[0] != m: + raise ValueError("Incorrect number of SNPs in annotations.") - n_a = annot.shape[1] # number of annotations + n_a = annot.shape[1] # Number of annotations cor_sum = np.zeros((m, n_a)) - # b = index of first SNP for which SNP 0 is not included in LD Score - b = np.nonzero(block_left > 0) - if np.any(b): - b = b[0][0] - else: - b = m - b = int(np.ceil(b / c) * c) # round up to a multiple of c + block_sizes = np.array(np.arange(m) - block_left) + block_sizes = np.ceil(block_sizes / c) * c + + b = np.nonzero(block_left > 0)[0] + b = b[0] if b.size > 0 else m + b = int(np.ceil(b / c) * c) if b > m: c = 1 b = m - l_A = 0 # l_A := index of leftmost SNP in matrix A + + l_a = 0 # Index of leftmost SNP in matrix A A = snp_getter(b) - rfuncAB = np.zeros((b, c)) - rfuncBB = np.zeros((c, c)) - # chunk inside of block - for l_B in range(0, b, c): # l_B := index of leftmost SNP in matrix B - B = A[:, l_B : l_B + c] - np.dot(A.T, B / n, out=rfuncAB) - rfuncAB = func(rfuncAB) - cor_sum[l_A : l_A + b, :] += np.dot(rfuncAB, annot[l_B : l_B + c, :]) - # chunk to right of block + rfunc_ab = np.zeros((b, c)) + rfunc_bb = np.zeros((c, c)) + + # Process chunks inside the block + for l_b in range(0, b, c): + B = A[:, l_b : l_b + c] + np.dot(A.T, B / n, out=rfunc_ab) + rfunc_ab = func(rfunc_ab) + cor_sum[l_a : l_a + b, :] += rfunc_ab @ annot[l_b : l_b + c, :] + + # Process chunks to the right of the block b0 = b md = int(c * np.floor(m / c)) end = md + 1 if md != m else md - for l_B in range(b0, end, c): - # check if the annot matrix is all zeros for this block + chunk - # this happens w/ sparse categories (i.e., pathways) - # update the block + + for l_b in range(b0, end, c): old_b = b - b = int(block_sizes[l_B]) - if l_B > b0 and b > 0: - # block_size can't increase more than c - # block_size can't be less than c unless it is zero - # both of these things make sense + b = int(block_sizes[l_b]) + if l_b > b0 and b > 0: A = np.hstack((A[:, old_b - b + c : old_b], B)) - l_A += old_b - b + c - elif l_B == b0 and b > 0: + l_a += old_b - b + c + elif l_b == b0 and b > 0: A = A[:, b0 - b : b0] - l_A = b0 - b - elif b == 0: # no SNPs to left in window, e.g., after a sequence gap - A = np.array(()).reshape((n, 0)) - l_A = l_B - if l_B == md: + l_a = b0 - b + elif b == 0: + A = np.empty((n, 0)) + l_a = l_b + if l_b == md: c = m - md - rfuncAB = np.zeros((b, c)) - rfuncBB = np.zeros((c, c)) + rfunc_ab = np.zeros((b, c)) + rfunc_bb = np.zeros((c, c)) if b != old_b: - rfuncAB = np.zeros((b, c)) + rfunc_ab = np.zeros((b, c)) B = snp_getter(c) - p1 = np.all(annot[l_A : l_A + b, :] == 0) - p2 = np.all(annot[l_B : l_B + c, :] == 0) - if p1 and p2: + if np.all(annot[l_a : l_a + b, :] == 0) and np.all(annot[l_b : l_b + c, :] == 0): continue - np.dot(A.T, B / n, out=rfuncAB) - rfuncAB = func(rfuncAB) - cor_sum[l_A : l_A + b, :] += np.dot(rfuncAB, annot[l_B : l_B + c, :]) - cor_sum[l_B : l_B + c, :] += np.dot(annot[l_A : l_A + b, :].T, rfuncAB).T - np.dot(B.T, B / n, out=rfuncBB) - rfuncBB = func(rfuncBB) - cor_sum[l_B : l_B + c, :] += np.dot(rfuncBB, annot[l_B : l_B + c, :]) + np.dot(A.T, B / n, out=rfunc_ab) + rfunc_ab = func(rfunc_ab) + cor_sum[l_a : l_a + b, :] += rfunc_ab @ annot[l_b : l_b + c, :] + cor_sum[l_b : l_b + c, :] += (annot[l_a : l_a + b, :].T @ rfunc_ab).T + np.dot(B.T, B / n, out=rfunc_bb) + rfunc_bb = func(rfunc_bb) + cor_sum[l_b : l_b + c, :] += rfunc_bb @ annot[l_b : l_b + c, :] return cor_sum + def next_snps(self, b: int, minor_ref: Optional[bool] = None) -> np.ndarray: + """ + Retrieve the next b SNPs from the genotype data. + + Args: + b (int): Number of SNPs to retrieve. + minor_ref (Optional[bool]): Whether to flip reference alleles to the minor allele. + + Returns: + np.ndarray: Matrix of normalized genotypes (shape: (n, b)). + + Raises: + ValueError: If b is not a positive integer or if insufficient SNPs remain. + """ + raise NotImplementedError("Subclasses must implement the next_snps method.") + -class PlinkBEDFile(__GenotypeArrayInMemory__): +class PlinkBEDFile(GenotypeArrayInMemory): """ - Interface for Plink .bed format + Class for handling PLINK .bed genotype files. + + This class provides methods to read PLINK .bed files, filter data, and compute LD scores. """ - def __init__(self, fname, n, snp_list, keep_snps=None, keep_indivs=None, mafMin=None): + def __init__( + self, + fname: str, + n: int, + snp_list, + keep_snps: Optional[np.ndarray] = None, + keep_indivs: Optional[np.ndarray] = None, + maf_min: Optional[float] = None, + ) -> None: + """ + Initialize the PlinkBEDFile object. + + Args: + fname (str): Filename of the .bed file. + n (int): Number of individuals. + snp_list: SNP list object containing SNP metadata. + keep_snps (Optional[np.ndarray]): Indices of SNPs to keep. + keep_indivs (Optional[np.ndarray]): Indices of individuals to keep. + maf_min (Optional[float]): Minimum minor allele frequency for filtering. + """ self._bedcode = { 2: ba.bitarray("11"), 9: ba.bitarray("10"), 1: ba.bitarray("01"), 0: ba.bitarray("00"), } - - __GenotypeArrayInMemory__.__init__( - self, + super().__init__( fname, n, snp_list, keep_snps=keep_snps, keep_indivs=keep_indivs, - mafMin=mafMin, + maf_min=maf_min, ) - def __read__(self, fname, m, n): - if not fname.endswith(".bed"): - raise ValueError(".bed filename must end in .bed") - - fh = open(fname, "rb") - magicNumber = ba.bitarray(endian="little") - magicNumber.fromfile(fh, 2) - bedMode = ba.bitarray(endian="little") - bedMode.fromfile(fh, 1) - e = (4 - n % 4) if n % 4 != 0 else 0 - nru = n + e - self.nru = nru - # check magic number - if magicNumber != ba.bitarray("0011011011011000"): - raise IOError("Magic number from Plink .bed file not recognized") - - if bedMode != ba.bitarray("10000000"): - raise IOError("Plink .bed file must be in default SNP-major mode") - - # check file length - self.geno = ba.bitarray(endian="little") - self.geno.fromfile(fh) - self.__test_length__(self.geno, self.m, self.nru) - return (self.nru, self.geno) - - def __test_length__(self, geno, m, nru): - exp_len = 2 * m * nru - real_len = len(geno) - if real_len != exp_len: - s = "Plink .bed file has {n1} bits, expected {n2}" - raise IOError(s.format(n1=real_len, n2=exp_len)) - - def __filter_indivs__(self, geno, keep_indivs, m, n): - n_new = len(keep_indivs) - e = (4 - n_new % 4) if n_new % 4 != 0 else 0 - nru_new = n_new + e - nru = self.nru - z = ba.bitarray(m * 2 * nru_new, endian="little") - z.setall(0) - for e, i in enumerate(keep_indivs): - z[2 * e :: 2 * nru_new] = geno[2 * i :: 2 * nru] - z[2 * e + 1 :: 2 * nru_new] = geno[2 * i + 1 :: 2 * nru] - - self.nru = nru_new - return (z, m, n_new) - - def __filter_snps_maf__(self, geno, m, n, mafMin, keep_snps): + def _read(self, fname: str, m: int, n: int) -> Tuple[int, ba.bitarray]: """ - Credit to Chris Chang and the Plink2 developers for this algorithm - Modified from plink_filter.c - https://github.com/chrchang/plink-ng/blob/master/plink_filter.c + Read genotype data from a PLINK .bed file. + + Args: + fname (str): Filename of the .bed file. + m (int): Number of SNPs. + n (int): Number of individuals. - Genotypes are read forwards (since we are cheating and using endian="little") + Returns: + Tuple[int, ba.bitarray]: Number of units (nru) and genotype bitarray. - A := (genotype) & 1010... - B := (genotype) & 0101... - C := (A >> 1) & B + Raises: + ValueError: If the file format is incorrect or the magic number is unrecognized. + IOError: If the .bed file is not in SNP-major mode. + """ + if not fname.endswith(".bed"): + raise ValueError("Filename must end with '.bed'.") + + with open(fname, "rb") as fh: + magic_number = ba.bitarray(endian="little") + magic_number.fromfile(fh, 2) + bed_mode = ba.bitarray(endian="little") + bed_mode.fromfile(fh, 1) + e = (4 - n % 4) if n % 4 != 0 else 0 + nru = n + e + + # Check magic number + if magic_number != ba.bitarray("0011011011011000"): + raise IOError("Unrecognized magic number in PLINK .bed file.") + + if bed_mode != ba.bitarray("10000000"): + raise IOError("PLINK .bed file must be in default SNP-major mode.") + + # Read genotype data + geno = ba.bitarray(endian="little") + geno.fromfile(fh) + self._test_length(geno, m, nru) + return nru, geno + + @staticmethod + def _test_length(geno: ba.bitarray, m: int, nru: int) -> None: + """ + Verify the length of the genotype bitarray. - Then + Args: + geno (ba.bitarray): Genotype bitarray. + m (int): Number of SNPs. + nru (int): Number of units (number of individuals plus padding). - a := A.count() = missing ct + hom major ct - b := B.count() = het ct + hom major ct - c := C.count() = hom major ct + Raises: + IOError: If the actual length does not match the expected length. + """ + expected_len = 2 * m * nru + actual_len = len(geno) + if actual_len != expected_len: + raise IOError(f"PLINK .bed file has {actual_len} bits; expected {expected_len} bits.") + + def _filter_indivs( + self, geno: ba.bitarray, keep_indivs: np.ndarray, m: int, n: int + ) -> Tuple[ba.bitarray, int, int]: + """ + Filter individuals from the genotype data. - Which implies that + Args: + geno (ba.bitarray): Genotype bitarray. + keep_indivs (np.ndarray): Indices of individuals to keep. + m (int): Number of SNPs. + n (int): Number of individuals. - missing ct = a - c - # of indivs with nonmissing genotype = n - a + c - major allele ct = b + c - major allele frequency = (b+c)/(2*(n-a+c)) - het ct + missing ct = a + b - 2*c + Returns: + Tuple[ba.bitarray, int, int]: Filtered genotype bitarray, number of SNPs, and new n. - Why does bitarray not have >> ???? + Raises: + ValueError: If keep_indivs indices are out of bounds. + """ + if np.any(keep_indivs >= n): + raise ValueError("keep_indivs indices out of bounds.") + n_new = len(keep_indivs) + e = (4 - n_new % 4) if n_new % 4 != 0 else 0 + nru_new = n_new + e + nru = self.nru + z = ba.bitarray(m * 2 * nru_new, endian="little") + z.setall(0) + for idx, i in enumerate(keep_indivs): + z[2 * idx :: 2 * nru_new] = geno[2 * i :: 2 * nru] + z[2 * idx + 1 :: 2 * nru_new] = geno[2 * i + 1 :: 2 * nru] + self.nru = nru_new + return z, m, n_new + + def _filter_snps_maf( + self, + geno: ba.bitarray, + m: int, + n: int, + maf_min: float, + keep_snps: Optional[np.ndarray], + ) -> Tuple[ba.bitarray, int, int, list, np.ndarray]: + """ + Filter SNPs based on MAF and specified SNP indices. + + Args: + geno (ba.bitarray): Genotype bitarray. + m (int): Number of SNPs. + n (int): Number of individuals. + maf_min (float): Minimum minor allele frequency. + keep_snps (Optional[np.ndarray]): Indices of SNPs to keep. + + Returns: + Tuple containing: + - ba.bitarray: Filtered genotype bitarray. + - int: Number of polymorphic SNPs. + - int: Number of individuals. + - list: Indices of kept SNPs. + - np.ndarray: Allele frequencies of kept SNPs. """ nru = self.nru m_poly = 0 - y = ba.bitarray() + filtered_geno = ba.bitarray(endian="little") if keep_snps is None: keep_snps = range(m) kept_snps = [] freq = [] - for e, j in enumerate(keep_snps): + for idx, j in enumerate(keep_snps): z = geno[2 * nru * j : 2 * nru * (j + 1)] A = z[0::2] - a = A.count() B = z[1::2] + a = A.count() b = B.count() c = (A & B).count() - major_ct = b + c # number of copies of the major allele - n_nomiss = n - a + c # number of individuals with nonmissing genotypes + major_ct = b + c + n_nomiss = n - a + c f = major_ct / (2 * n_nomiss) if n_nomiss > 0 else 0 - het_miss_ct = a + b - 2 * c # remove SNPs that are only either het or missing - if np.minimum(f, 1 - f) > mafMin and het_miss_ct < n: + het_miss_ct = a + b - 2 * c + if min(f, 1 - f) > maf_min and het_miss_ct < n: freq.append(f) - y += z + filtered_geno += z m_poly += 1 kept_snps.append(j) + return filtered_geno, m_poly, n, kept_snps, np.array(freq) - return (y, m_poly, n, kept_snps, freq) - - def nextSNPs(self, b, minorRef=None): + def next_snps(self, b: int, minor_ref: Optional[bool] = None) -> np.ndarray: """ - Unpacks the binary array of genotypes and returns an n x b matrix of floats of - normalized genotypes for the next b SNPs, where n := number of samples. - - Parameters - ---------- - b : int - Number of SNPs to return. - minorRef: bool, default None - Should we flip reference alleles so that the minor allele is the reference? - (This is useful for computing l1 w.r.t. minor allele). - - Returns - ------- - X : np.array with dtype float64 with shape (n, b), where n := number of samples - Matrix of genotypes normalized to mean zero and variance one. If minorRef is - not None, then the minor allele will be the positive allele (i.e., two copies - of the minor allele --> a positive number). + Retrieve the next b SNPs from the genotype data. - """ + Args: + b (int): Number of SNPs to retrieve. + minor_ref (Optional[bool]): Whether to flip reference alleles to the minor allele. - try: - b = int(b) - if b <= 0: - raise ValueError("b must be > 0") - except TypeError: - raise TypeError("b must be an integer") + Returns: + np.ndarray: Matrix of normalized genotypes (shape: (n, b)). - if self._currentSNP + b > self.m: - s = "{b} SNPs requested, {k} SNPs remain" - raise ValueError(s.format(b=b, k=(self.m - self._currentSNP))) + Raises: + ValueError: If b is not a positive integer or if insufficient SNPs remain. + """ + if not isinstance(b, int) or b <= 0: + raise ValueError("b must be a positive integer.") + if self._current_snp + b > self.m: + remaining = self.m - self._current_snp + raise ValueError(f"{b} SNPs requested; only {remaining} SNPs remain.") - c = self._currentSNP + c = self._current_snp n = self.n nru = self.nru - slice = self.geno[2 * c * nru : 2 * (c + b) * nru] - X = np.array(list(slice.decode(self._bedcode)), dtype="float64").reshape((b, nru)).T - X = X[0:n, :] - Y = np.zeros(X.shape) - for j in range(0, b): - newsnp = X[:, j] - ii = newsnp != 9 - avg = np.mean(newsnp[ii]) - newsnp[np.logical_not(ii)] = avg - denom = np.std(newsnp) + slice_start = 2 * c * nru + slice_end = 2 * (c + b) * nru + geno_slice = self.geno[slice_start:slice_end] + X = np.array(list(geno_slice.decode(self._bedcode)), dtype="float64").reshape((b, nru)).T + X = X[:n, :] + Y = np.zeros_like(X) + for j in range(b): + snp = X[:, j] + valid_idx = snp != 9 + avg = np.mean(snp[valid_idx]) + snp[~valid_idx] = avg + denom = np.std(snp) if denom == 0: - denom = 1 - - if minorRef is not None and self.freq[self._currentSNP + j] > 0.5: - denom = denom * -1 - - Y[:, j] = (newsnp - avg) / denom - - self._currentSNP += b + denom = 1.0 + if minor_ref is not None and self.freq[self._current_snp + j] > 0.5: + denom *= -1 + Y[:, j] = (snp - avg) / denom + self._current_snp += b return Y diff --git a/make_annot.py b/make_annot.py index ef9653dd..b5452e34 100755 --- a/make_annot.py +++ b/make_annot.py @@ -42,7 +42,7 @@ def make_annot_files(args, bed_for_annot): df_annot.to_csv(args.annot_file, sep="\t", index=False) -if __name__ == "__main__": +def main(): parser = argparse.ArgumentParser() parser.add_argument("--gene-set-file", type=str, help="a file of gene names, one line per gene.") parser.add_argument( @@ -84,3 +84,7 @@ def make_annot_files(args, bed_for_annot): bed_for_annot = bed_for_annot.merge() make_annot_files(args, bed_for_annot) + + +if __name__ == "__main__": + main() diff --git a/munge_sumstats.py b/munge_sumstats.py index 0b11e06e..ccbbaf83 100755 --- a/munge_sumstats.py +++ b/munge_sumstats.py @@ -833,5 +833,9 @@ def munge_sumstats(args, p=True): log.log("Total time elapsed: {T}".format(T=sec_to_str(round(time.time() - START_TIME, 2)))) -if __name__ == "__main__": +def main(): munge_sumstats(parser.parse_args(), p=True) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index de6544a5..c5123337 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,8 +17,9 @@ bitarray = "^3.0.0" nose = "^1.3.7" [tool.poetry.scripts] -ldsc = "ldscore.ldsc:main" -munge_sumstats = "ldscore.munge_sumstats:main" +ldsc = "ldsc:main" +munge_sumstats = "munge_sumstats:main" +make_annotation = "make_annot:main" [tool.poetry.group.dev.dependencies] pre-commit = "^4.0.1" diff --git a/test/test_ldscore.py b/test/test_ldscore.py index 8880fd32..f7c6c845 100644 --- a/test/test_ldscore.py +++ b/test/test_ldscore.py @@ -1,115 +1,198 @@ -import unittest +""" +Unit Tests for LD Score Calculation Module. + +This module contains unit tests for the ldscore.py module, specifically testing +the functions and classes related to LD score calculation using PLINK .bed files. + +Tests: + - test_get_block_lefts: Tests the get_block_lefts function. + - test_block_left_to_right: Tests the block_left_to_right function. + - TestPlinkBEDFile: Unit tests for the PlinkBEDFile class. + +Note: + Ensure that the test data files are located in the 'test/plink_test' directory. + +(c) 2015 Brendan Bulik-Sullivan and Hilary Finucane +(c) 2024 Thomas Reimonn +""" + +from unittest import TestCase import bitarray as ba -import nose import numpy as np import ldscore.ldscore as ld import ldscore.parse as ps -def test_getBlockLefts(): - l = [ - (np.arange(1, 6), 5, np.zeros(5)), - (np.arange(1, 6), 0, np.arange(0, 5)), - ((1, 4, 6, 7, 7, 8), 2, (0, 1, 1, 2, 2, 2)), +def test_get_block_lefts(): + """ + Test the get_block_lefts function with various inputs. + """ + test_cases = [ + (np.arange(1, 6), 5, np.zeros(5, dtype=int)), + (np.arange(1, 6), 0, np.arange(5)), + (np.array([1, 4, 6, 7, 7, 8]), 2, np.array([0, 1, 1, 2, 2, 2])), ] - for coords, max_dist, correct in l: - assert np.all(ld.getBlockLefts(coords, max_dist) == correct) + for coords, max_dist, expected in test_cases: + result = ld.get_block_lefts(coords, max_dist) + assert np.array_equal(result, expected), f"Failed for coords={coords}, max_dist={max_dist}" def test_block_left_to_right(): - l = [ - ((0, 0, 0, 0, 0), (5, 5, 5, 5, 5)), - ((0, 1, 2, 3, 4, 5), (1, 2, 3, 4, 5, 6)), - ((0, 0, 2, 2), (2, 2, 4, 4)), + """ + Test the block_left_to_right function with various inputs. + """ + test_cases = [ + (np.array([0, 0, 0, 0, 0]), np.array([5, 5, 5, 5, 5])), + (np.array([0, 1, 2, 3, 4, 5]), np.array([1, 2, 3, 4, 5, 6])), + (np.array([0, 0, 2, 2]), np.array([2, 2, 4, 4])), ] - for block_left, correct_answer in l: - block_right = ld.block_left_to_right(block_left) - assert np.all(block_right == correct_answer) + for block_left, expected in test_cases: + result = ld.block_left_to_right(block_left) + assert np.array_equal(result, expected), f"Failed for block_left={block_left}" -class test_bed(unittest.TestCase): +class TestPlinkBEDFile(TestCase): + """ + Unit tests for the PlinkBEDFile class in ldscore.py. + """ def setUp(self): - self.M = 8 - self.N = 5 + """ + Set up the test environment by initializing necessary variables. + """ + self.m = 8 # Total number of SNPs in test data + self.n = 5 # Total number of individuals in test data self.bim = ps.PlinkBIMFile("test/plink_test/plink.bim") - def test_bed(self): - bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.N, self.bim) - # remove three monomorphic SNPs - print(bed.geno) - print(bed.m) - assert bed.m == 4 - # no individuals removed - print(bed.n) - assert self.N == bed.n - # 5 indivs * 4 polymorphic SNPs - print(len(bed.geno)) - assert len(bed.geno) == 64 - print(bed.freq) - correct = np.array([0.59999999999999998, 0.59999999999999998, 0.625, 0.625]) - assert np.all(bed.freq == correct) + def test_bed_initialization(self): + """ + Test the initialization of the PlinkBEDFile class. + """ + bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.n, self.bim) + # After filtering monomorphic SNPs, m should be 4 + self.assertEqual(bed.m, 4, "Number of SNPs after filtering should be 4.") + # No individuals should be removed + self.assertEqual(bed.n, self.n, "Number of individuals should remain unchanged.") + # Check the length of the genotype bitarray + expected_length = 2 * bed.m * bed.nru + self.assertEqual(len(bed.geno), expected_length, "Genotype bitarray length mismatch.") + # Check allele frequencies + expected_freq = np.array([0.6, 0.6, 0.625, 0.625]) + np.testing.assert_array_almost_equal( + bed.freq, expected_freq, err_msg="Allele frequencies do not match expected values." + ) def test_filter_snps(self): + """ + Test SNP filtering in PlinkBEDFile. + """ keep_snps = [1, 4] - bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.N, self.bim, keep_snps=keep_snps) - assert bed.m == 1 - assert bed.n == 5 - # pad bits are initialized with random memory --> can't test them - assert bed.geno[0:10] == ba.bitarray("0001011111") + bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.n, self.bim, keep_snps=np.array(keep_snps)) + # Only SNP index 1 should remain after filtering (since SNP at index 4 is monomorphic) + self.assertEqual(bed.m, 1, "Number of SNPs after filtering should be 1.") + self.assertEqual(bed.n, self.n, "Number of individuals should remain unchanged.") + # Test the genotype bitarray (cannot test pad bits) + expected_bits = ba.bitarray("0001011111") + self.assertEqual( + bed.geno[0:10], + expected_bits, + "Genotype bitarray does not match expected values after SNP filtering.", + ) - def test_filter_indivs(self): + def test_filter_individuals(self): + """ + Test individual filtering in PlinkBEDFile. + """ keep_indivs = [0, 1] - bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.N, self.bim, keep_indivs=keep_indivs) - assert bed.m == 2 - assert bed.n == 2 - # pad bits are initialized with random memory --> can't test them - assert bed.geno[0:4] == ba.bitarray("0001") - assert bed.geno[8:12] == ba.bitarray("0001") - - def test_filter_indivs_and_snps(self): + bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.n, self.bim, keep_indivs=np.array(keep_indivs)) + self.assertEqual(bed.m, 2, "Number of SNPs should be 2 after filtering monomorphic SNPs.") + self.assertEqual(bed.n, 2, "Number of individuals after filtering should be 2.") + # Test the genotype bitarray (cannot test pad bits) + expected_bits_snp1 = ba.bitarray("0001") + expected_bits_snp2 = ba.bitarray("0001") + self.assertEqual( + bed.geno[0:4], + expected_bits_snp1, + "Genotype bitarray for SNP 1 does not match expected values after individual filtering.", + ) + self.assertEqual( + bed.geno[8:12], + expected_bits_snp2, + "Genotype bitarray for SNP 2 does not match expected values after individual filtering.", + ) + + def test_filter_individuals_and_snps(self): + """ + Test simultaneous SNP and individual filtering in PlinkBEDFile. + """ keep_indivs = [0, 1] keep_snps = [1, 5] bed = ld.PlinkBEDFile( "test/plink_test/plink.bed", - self.N, + self.n, self.bim, - keep_snps=keep_snps, - keep_indivs=keep_indivs, + keep_snps=np.array(keep_snps), + keep_indivs=np.array(keep_indivs), + ) + # Only SNP at index 1 should remain after filtering + self.assertEqual(bed.m, 1, "Number of SNPs after filtering should be 1.") + self.assertEqual(bed.n, 2, "Number of individuals after filtering should be 2.") + expected_bits = ba.bitarray("0001") + self.assertEqual( + bed.geno[0:4], + expected_bits, + "Genotype bitarray does not match expected values after filtering.", ) - assert bed.m == 1 - assert bed.n == 2 - print(bed.geno) - assert bed.geno[0:4] == ba.bitarray("0001") - @nose.tools.raises(ValueError) def test_bad_filename(self): - bed = ld.PlinkBEDFile("test/plink_test/plink.bim", 9, self.bim) - - @nose.tools.raises(ValueError) - def test_nextSNPs_errors1(self): - bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.N, self.bim) - bed.nextSNPs(0) - - @nose.tools.raises(ValueError) - def test_nextSNPs_errors2(self): - bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.N, self.bim) - bed.nextSNPs(5) - - def test_nextSNPs(self): + """ + Test error handling when an incorrect filename is provided. + """ + with self.assertRaises(ValueError): + ld.PlinkBEDFile("test/plink_test/plink.bim", self.n, self.bim) + + def test_next_snps_errors(self): + """ + Test error handling in the next_snps method. + """ + bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.n, self.bim) + with self.assertRaises(ValueError): + bed.next_snps(0) + with self.assertRaises(ValueError): + bed.next_snps(5) # Requesting more SNPs than available + + def test_next_snps(self): + """ + Test the next_snps method for retrieving SNPs. + """ for b in [1, 2, 3]: - bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.N, self.bim) - x = bed.nextSNPs(b) - assert x.shape == (5, b) - assert np.all(np.abs(np.mean(x, axis=0)) < 0.01) - assert np.all(np.abs(np.std(x, axis=0) - 1) < 0.01) - - def test_nextSNPs_maf_ref(self): + bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.n, self.bim) + x = bed.next_snps(b) + self.assertEqual(x.shape, (self.n, b), f"Shape of SNP matrix should be ({self.n}, {b}).") + np.testing.assert_array_almost_equal( + np.mean(x, axis=0), + np.zeros(b), + decimal=2, + err_msg="Mean of SNP matrix columns should be approximately zero.", + ) + np.testing.assert_array_almost_equal( + np.std(x, axis=0), + np.ones(b), + decimal=2, + err_msg="Standard deviation of SNP matrix columns should be approximately one.", + ) + + def test_next_snps_minor_ref(self): + """ + Test the next_snps method with minor allele as the reference. + """ b = 4 - bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.N, self.bim) - x = bed.nextSNPs(b) - bed._currentSNP -= b - y = bed.nextSNPs(b, minorRef=True) - assert np.all(x == -y) + bed = ld.PlinkBEDFile("test/plink_test/plink.bed", self.n, self.bim) + x = bed.next_snps(b) + bed._current_snp -= b # Reset the current SNP index + y = bed.next_snps(b, minor_ref=True) + np.testing.assert_array_almost_equal( + x, -y, decimal=5, err_msg="SNP matrices should be negatives of each other." + ) diff --git a/test/test_munge_sumstats.py b/test/test_munge_sumstats.py index a5645723..13f4d8e1 100644 --- a/test/test_munge_sumstats.py +++ b/test/test_munge_sumstats.py @@ -1,4 +1,4 @@ -import unittest +from unittest import TestCase import nose import numpy as np @@ -25,7 +25,7 @@ def log(self, x): args = munge.parser.parse_args("") -class test_p_to_z(unittest.TestCase): +class test_p_to_z(TestCase): def setUp(self): self.N = pd.Series([1, 2, 3]) @@ -36,7 +36,7 @@ def test_p_to_z(self): assert_allclose(munge.p_to_z(self.P, self.N), self.Z, atol=1e-5) -class test_check_median(unittest.TestCase): +class test_check_median(TestCase): def setUp(self): self.x = pd.Series([1, 2, 3]) @@ -49,7 +49,7 @@ def test_bad_median(self): nose.tools.assert_raises(ValueError, munge.check_median, self.x, 0, 0.1, "TEST") -class test_process_n(unittest.TestCase): +class test_process_n(TestCase): def setUp(self): self.dat = pd.DataFrame(["rs1", "rs2", "rs3"], columns=["SNP"]) @@ -136,7 +136,7 @@ def test_filter_alleles(): assert_series_equal(x, y) -class test_allele_merge(unittest.TestCase): +class test_allele_merge(TestCase): def setUp(self): self.dat = pd.DataFrame(np.transpose([["a", "b", "c"], ["A", "T", "C"], ["C", "G", "A"]])) @@ -152,7 +152,7 @@ def test_merge(self): assert_frame_equal(x, answer) -class test_parse_dat(unittest.TestCase): +class test_parse_dat(TestCase): def setUp(self): dat = pd.DataFrame() @@ -216,7 +216,7 @@ def test_get_compression_gzip(): nose.tools.eq_(x, None) -class test_parse_flag_cnames(unittest.TestCase): +class test_parse_flag_cnames(TestCase): def setUp(self): self.args = munge.parser.parse_args("") @@ -258,7 +258,7 @@ def test_sign_error(self): nose.tools.assert_raises(ValueError, munge.parse_flag_cnames, log, self.args) -class test_cname_map(unittest.TestCase): +class test_cname_map(TestCase): def setUp(self): pass @@ -282,7 +282,7 @@ def test_ignore(self): self.assertEqual(x["N"], "FOOBAR") -class test_end_to_end(unittest.TestCase): +class test_end_to_end(TestCase): def setUp(self): self.args = munge.parser.parse_args("") diff --git a/test/test_parse.py b/test/test_parse.py index 983a5a5f..e1d340bc 100644 --- a/test/test_parse.py +++ b/test/test_parse.py @@ -1,5 +1,5 @@ import os -import unittest +from unittest import TestCase import numpy as np import pandas as pd @@ -55,7 +55,7 @@ def test_frq_parser(): assert_array_equal(x.FRQ, [0.01, 0.1, 0.3, 0.2, 0.2, 0.2, 0.01, 0.03]) -class Test_ldscore(unittest.TestCase): +class Test_ldscore(TestCase): def test_ldscore(self): x = ps.ldscore(os.path.join(DIR, "parse_test/test")) @@ -79,7 +79,7 @@ def test_ldscore_fromlist(self): assert_raises(ValueError, ps.ldscore_fromlist, [fh, os.path.join(DIR, "parse_test/test2")]) -class Test_M(unittest.TestCase): +class Test_M(TestCase): def test_bad_M(self): assert_raises(ValueError, ps.M, os.path.join(DIR, "parse_test/test_bad")) @@ -101,7 +101,7 @@ def test_M_fromlist(self): assert_array_equal(x, np.hstack((ps.M(fh), ps.M(fh)))) -class Test_Fam(unittest.TestCase): +class Test_Fam(TestCase): def test_fam(self): fam = ps.PlinkFAMFile(os.path.join(DIR, "plink_test/plink.fam")) @@ -113,7 +113,7 @@ def test_bad_filename(self): assert_raises(ValueError, ps.PlinkFAMFile, os.path.join(DIR, "plink_test/plink.bim")) -class Test_Bim(unittest.TestCase): +class Test_Bim(TestCase): def test_bim(self): bim = ps.PlinkBIMFile(os.path.join(DIR, "plink_test/plink.bim")) diff --git a/test/test_regressions.py b/test/test_regressions.py index b1c45e04..638c7212 100644 --- a/test/test_regressions.py +++ b/test/test_regressions.py @@ -1,4 +1,4 @@ -import unittest +from unittest import TestCase import nose import numpy as np @@ -49,7 +49,7 @@ def test_remove_brackets(): nose.tools.assert_equal(reg.remove_brackets(x), "asdf") -class Test_h2_obs_to_liab(unittest.TestCase): +class Test_h2_obs_to_liab(TestCase): def test_bad_data(self): assert_raises(ValueError, reg.h2_obs_to_liab, 1, 1, 0.5) @@ -63,7 +63,7 @@ def test_approx_scz(self): assert_array_almost_equal(x, 0.551907298063) -class Test_gencov_obs_to_liab(unittest.TestCase): +class Test_gencov_obs_to_liab(TestCase): def test_qt(self): self.assertEqual(reg.gencov_obs_to_liab(1, None, None, None, None), 1) @@ -77,7 +77,7 @@ def test_approx_scz(self): assert_array_almost_equal(x, 0.551907298063) -class Test_Hsq_1D(unittest.TestCase): +class Test_Hsq_1D(TestCase): def setUp(self): self.chisq = np.ones((4, 1)) * 4 @@ -133,7 +133,7 @@ def test_aggregate(self): assert_array_almost_equal(agg, 0) -class Test_Coef(unittest.TestCase): +class Test_Coef(TestCase): def setUp(self): self.hsq1 = 0.2 @@ -180,7 +180,7 @@ def test_intercept(self): assert_array_almost_equal(self.hsq_int.ratio, 0) -class Test_Hsq_2D(unittest.TestCase): +class Test_Hsq_2D(TestCase): def setUp(self): self.chisq = np.ones((17, 1)) * 4 @@ -203,7 +203,7 @@ def test_summary(self): hsq.summary(["asdf", "qwer"]) -class Test_Gencov_1D(unittest.TestCase): +class Test_Gencov_1D(TestCase): def setUp(self): self.z1 = np.ones((4, 1)) * 4 @@ -278,7 +278,7 @@ def test_aggregate(self): assert_array_almost_equal(agg, 0) -class Test_Gencov_2D(unittest.TestCase): +class Test_Gencov_2D(TestCase): def setUp(self): self.ld = np.abs(np.random.normal(size=100).reshape((50, 2))) + 2 @@ -352,7 +352,7 @@ def test_eq_hsq(self): assert_array_almost_equal(gencov.tot_cov, hsq.tot_cov) -class Test_RG_2D(unittest.TestCase): +class Test_RG_2D(TestCase): def setUp(self): self.ld = np.abs(np.random.normal(size=100).reshape((50, 2))) + 2 @@ -387,7 +387,7 @@ def test_rg(self): assert np.abs(self.rg.rg_ratio + 1) < 0.01 -class Test_RG_Bad(unittest.TestCase): +class Test_RG_Bad(TestCase): def test_negative_h2(self): ld = np.arange(50).reshape((50, 1)) + 0.1 diff --git a/test/test_sumstats.py b/test/test_sumstats.py index 2ec172e0..746ef39f 100644 --- a/test/test_sumstats.py +++ b/test/test_sumstats.py @@ -11,8 +11,7 @@ """ import os -import unittest -from typing import Any, List +from unittest import TestCase import numpy as np import pandas as pd @@ -63,7 +62,7 @@ def get_attr(attr: str): return lambda obj: getattr(obj, attr, float("nan")) -class TestSumstatsFunctions(unittest.TestCase): +class TestSumstatsFunctions(TestCase): """ Unit tests for individual functions in sumstats.py. """ @@ -261,7 +260,7 @@ def test_strand_ambiguous(self): @attr("rg") @attr("slow") -class TestGeneticCorrelationStatistical(unittest.TestCase): +class TestGeneticCorrelationStatistical(TestCase): """ Statistical tests for genetic correlation estimation. """ @@ -318,7 +317,7 @@ def test_rg_se_no_intercept(self): @attr("h2") @attr("slow") -class TestHeritabilityStatistical(unittest.TestCase): +class TestHeritabilityStatistical(TestCase): """ Statistical tests for heritability estimation. """ @@ -379,7 +378,7 @@ def test_total_heritability_se_no_intercept(self): # Additional tests for category-specific heritability and other statistics can be added here. -class TestEstimateFunctions(unittest.TestCase): +class TestEstimateFunctions(TestCase): """ Tests for the estimate_h2 and estimate_rg functions. """