Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing calculation of average #723

Merged
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
638199f
fixing calculation of average
grzanka Mar 15, 2024
8cf2439
more comments added
grzanka Mar 15, 2024
5024e18
weighted mean
grzanka Mar 19, 2024
a89b352
averaging
grzanka Mar 19, 2024
5cc22a6
fixes
grzanka Mar 19, 2024
f36aa26
update
grzanka Mar 19, 2024
c58e475
documentation
grzanka Mar 21, 2024
c411ca2
fix
grzanka Mar 21, 2024
ba578f0
fixes
grzanka Mar 21, 2024
4b63990
fix
grzanka Mar 21, 2024
b026bd1
drop python 3.8
grzanka Mar 21, 2024
51428b6
fixes
grzanka Mar 22, 2024
bce1299
fixes
grzanka Mar 22, 2024
40ae307
trying to fix mac
grzanka Mar 22, 2024
6adfe62
fix updating
grzanka Mar 22, 2024
d8028b4
test migration
grzanka Mar 22, 2024
80ea0d4
fixes
grzanka Mar 22, 2024
91cadac
deepsource
grzanka Mar 22, 2024
68e6e01
tests fixes
grzanka Mar 22, 2024
274b35f
deepsource
grzanka Mar 22, 2024
d6c5ac3
deepsource
grzanka Mar 22, 2024
a05f853
weights included
grzanka Mar 23, 2024
9497eb2
Fix stddev calculations
grzanka Mar 25, 2024
5ee462e
tests
grzanka Mar 25, 2024
4f96a56
fix docs
grzanka Mar 25, 2024
e59cebb
fix for std err calculations
grzanka Mar 25, 2024
5b14d44
deepsource
grzanka Mar 25, 2024
743c729
tests update
grzanka Mar 25, 2024
fef3acc
BDO files for averaging tests
grzanka Mar 25, 2024
a74d7c4
deepsource
grzanka Mar 25, 2024
f818e5c
unit tests
grzanka Mar 25, 2024
0fcba75
test improve
grzanka Mar 25, 2024
64b05a7
errors added
grzanka Mar 25, 2024
8f16763
deepsource
grzanka Mar 25, 2024
0f97a56
fix
grzanka Mar 25, 2024
a35a484
force gc to decrease memory usage
grzanka Mar 26, 2024
93cfda1
deepsource
grzanka Mar 26, 2024
f82d04e
review comments
grzanka Mar 26, 2024
acc647b
more documentation
grzanka Mar 26, 2024
26ec2cd
deepsource
grzanka Mar 26, 2024
bb1027d
deepsource
grzanka Mar 26, 2024
91bea99
not needed return removed
grzanka Mar 26, 2024
52947dc
more comments
grzanka Mar 26, 2024
ef2f074
Adjust output units for normalized quantities
grzanka Mar 26, 2024
faa4b5a
documentation reformat
grzanka Mar 27, 2024
1e6bac5
Merge branch 'master' into 722-check-if-weighted-average-from-multipl…
grzanka Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
if: >
!contains(github.event.head_commit.message, '[ci skip]') &&
!contains(github.event.head_commit.message, '[skip ci]')
runs-on: ubuntu-latest
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -24,7 +24,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.11'

- name: Install dependencies
run: |
Expand All @@ -38,18 +38,16 @@ jobs:
flake8 --count --select=E9,F63,F7,F82 --show-source --statistics pymchelper tests examples

- name: Smoke test with pytest
run: |
pytest -k "smoke" tests/
run: |
pytest -k "smoke"

# all tests on matrix of all possible python versions and OSes
normal_test:
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
python-version: ['3.9', '3.10', '3.11']
platform: [ubuntu-latest, macos-latest, windows-latest]
exclude:
- platform: macos-latest
python-version: '3.8'
- platform: macos-latest
python-version: '3.10'
- platform: macos-latest
Expand All @@ -73,6 +71,5 @@ jobs:
pip install -r tests/requirements-test.txt

- name: Test with pytest
run: |
pytest -k "not slow" tests/

run: |
pytest -k "not slow"
26 changes: 13 additions & 13 deletions .github/workflows/release-pip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
full_test:
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
python-version: ['3.9', '3.10', '3.11']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand All @@ -32,11 +32,11 @@ jobs:
pip install -r tests/requirements-test.txt

- name: Run all test with pytest
run: |
pytest -k "slow" tests/
run: |
pytest -k "slow"

- name: Check images generation for documentation
run: |
run: |
cd docs/images_generation && ./run.sh

# test if package generation works and optional package upload to pypi (only on release)
Expand All @@ -47,24 +47,24 @@ jobs:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.11'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Make wheel package and validate it
run: |
run: |
pip install wheel twine

# first call to version method would generate VERSION file
PYTHONPATH=. python pymchelper/run.py --version

python setup.py bdist_wheel

twine check dist/*.whl
Expand Down Expand Up @@ -99,7 +99,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.11'
- name: Build and Commit
uses: sphinx-notes/[email protected]
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release-win-exe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.10
python-version: 3.11

- name: Generate VERSION file
run: python3 setup.py --help

- name: Install pyinstaller
run: pip install pyinstaller
run: pip install pyinstaller

- name: Prepare requierements
run: pip install -r requirements.txt
Expand Down
214 changes: 214 additions & 0 deletions pymchelper/averaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""
Output from the simulation running on multiple processess needs to be agregated.
The simplest way of doing so is to calculated the average of the data.
There are however more sophisticated cases: COUNT scorers needs to be summed up, not averaged.
Phase space data needs to be concatenated, not averaged.
Each of the parallel jobs can have different number of histories, so the weighted average needs to be calculated.
Moreover, in case averaging is employed, we can estimate spread of the data as standard deviation or standard error.

The binary output files from each job may be quite large (even ~GB for scoring in fine 3D mesh), to obtain good
statistics we sometimes parallelise the simulation of hundreds or thousands of jobs (when using HPC clusters).
In such case it's not feasible to load all the data into memory and calculate the average in one go, using standard
functions from numpy library. Instead, we need to calculate the average in an online manner,
i.e. by updating the state of respective aggregator object with each new binary output file read.

Such approach results in a significant reduction of memory usage and is more numerically stable.

This module contains several classes for aggregating data from multiple files:
- Aggregator: base class for all other aggregators
- WeightedStatsAggregator: for calculating weighted (using reliability, not frequency weights) mean and variance
nbassler marked this conversation as resolved.
Show resolved Hide resolved
- ConcatenatingAggregator: for concatenating data
- SumAggregator: for calculating sum instead of variance
- NoAggregator: for cases when no aggregation is required
nbassler marked this conversation as resolved.
Show resolved Hide resolved

All aggregators have `data` and `error` property, which can be used to obtain the result of the aggregation.
The `data` property returns the result of the aggregation: mean, sum or concatenated array.
The `error` property returns the spread of data for WeightedStatsAggregator, and `None` for other aggregators.

The `update` method is used to update the state of the aggregator with new data from the file.
"""

from dataclasses import dataclass, field
import logging
from typing import Union
import numpy as np
from numpy.typing import ArrayLike


@dataclass
class Aggregator:
"""
Base class for all aggregators.
The `data` property returns the result of the aggregation, needs to be implemented in derived classes.
The `error` function returns the spread of data, can be implemented in derived classes. It's a function,
not a property as different type of error can be calculated (standard deviation, standard error, etc.).
Type of errors may be then passed in optional keyword arguments `**kwargs`.
"""

data: Union[float, ArrayLike] = float('nan')
_updated: bool = field(default=False, repr=False, init=False)

def update(self, value: Union[float, ArrayLike], **kwargs):
"""Update the state of the aggregator with new data."""
raise NotImplementedError(f"Update function not implemented for {self.__class__.__name__}")

def error(self, **kwargs):
"""Default implementation of error function, returns None."""
logging.debug("Error calculation not implemented for %s", self.__class__.__name__)

@property
def updated(self):
"""Check if the aggregator was updated."""
if isinstance(self.data, float) and np.isnan(self.data):
nbassler marked this conversation as resolved.
Show resolved Hide resolved
return False
return self._updated


@dataclass
class WeightedStatsAggregator(Aggregator):
"""
Calculates weighted mean and variance of a sequence of numbers or numpy arrays.
We do not use frequency weights (which sums up to 1), the total sum of all weights is not known as
we are aggregating data from multiple files with different number of histories.
The aggregation uses single pass loop over all files.

Good overview of currently known methods to calculate online weighted mean and variance can be found in [2].
The original method to calculate online mean and variance was proposed by Welford in [1].
The weighed version of this algoritm is nicely illustrated in [3].
Here we employ algoritm proposed by West in [4] and descibed in Wikipedia [5].

[1] Welford, B. P. (1962). "Note on a method for calculating corrected sums of squares and products".
Technometrics. 4 (3): 419–420.
[2] Schubert, Erich, and Michael Gertz. "Numerically stable parallel computation of (co-) variance."
Proceedings of the 30th international conference on scientific and statistical database management. 2018.
[3] https://justinwillmert.com/posts/2022/notes-on-calculating-online-statistics/
[4] West, D. H. D. (1979). "Updating Mean and Variance Estimates: An Improved Method".
Communications of the ACM. 22 (9): 532-535.
[5] https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Weighted_incremental_algorithm
"""

data: Union[float, ArrayLike] = float('nan')

_accumulator_S: Union[float, ArrayLike] = field(default=float('nan'), repr=False, init=False)
_total_weight_squared: float = field(default=0., repr=False, init=False)
total_weight: float = 0

def update(self, value: Union[float, ArrayLike], weight: float = 1.0, **kwargs):
"""
Update the state of the aggregator with new data.
Note that the weights are so called "reliability weights", not frequency weights.
If unsure put here the number of histories from the file if you are aggregating data from multiple files.
"""
if weight < 0:
raise ValueError("Weight must be non-negative")

# first pass initialization
if not self.updated:
self.data = value * 0
self._accumulator_S = value * 0

# W_n = W_{n-1} + w_n
self.total_weight += weight
self._total_weight_squared += weight**2

mean_old = self.data
# mu_n = (1 - w_n / W_n) * mu_{n-1} + (w_n / W_n) * x_n
# or in other words:
# mu_n - mu_{n-1} = (w_n / W_n) * (x_n - mu_{n-1})
self.data += (weight / self.total_weight) * (value - mean_old)

self._accumulator_S += weight * (value - self.data) * (value - mean_old)

self._updated = True
logging.debug("Updated aggregator with value %s and weight %s", value, weight)

@property
def mean(self):
"""Weighted mean of the sample"""
return self.data

@property
def variance_population(self):
"""Biased estimate of the variance"""
return self._accumulator_S / self.total_weight

@property
def variance_sample(self):
"""
Unbiased estimate of the variance.
The bias of the weighted estimator if (1 - sum w_i^2 / W_n^2), or in other words:
1 - "sum of squares of weights" / "square of sum of weights".
For all equal weights the bias is 1 - n * w^2/((n * w)^2) = 1 - 1/n which
leads to the well known formula for the sample variance.
Here we use the weighted version of the formula.
"""
return self._accumulator_S / (self.total_weight - (self._total_weight_squared / self.total_weight))
nbassler marked this conversation as resolved.
Show resolved Hide resolved

@property
def stddev(self):
"""Standard deviation of the sample"""
return np.sqrt(self.variance_sample)

@property
def stderr(self):
"""
Standard error of the sample.
For weighted data it is calculated as:
stddev * sqrt(sum w_i^2) / sum w_i
For equal weights it reduces via sqrt( n * w^2) / (n * w) = stddev / sqrt(n)
"""
return self.stddev * np.sqrt(self._total_weight_squared) / self.total_weight

def error(self, **kwargs):
"""Error calculation function, can be used to calculate standard deviation or standard error."""
logging.debug("Calculating error with kwargs: %s", kwargs)
if 'error_type' in kwargs:
if kwargs['error_type'] == 'stddev':
return self.stddev
if kwargs['error_type'] == 'stderr':
return self.stderr
return None
nbassler marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
class ConcatenatingAggregator(Aggregator):
"""Class for concatenating numpy arrays"""

def update(self, value: Union[float, ArrayLike], **kwargs):
"""Update the state of the aggregator with new data."""
if not self.updated:
self.data = value
else:
self.data = np.concatenate((self.data, value))
self._updated = True


@dataclass
class SumAggregator(Aggregator):
"""Class for calculating sum of a sequence of numbers."""

def update(self, value: Union[float, ArrayLike], **kwargs):
"""Update the state of the aggregator with new data."""
# first value added
if not self.updated:
self.data = value
# subsequent values added
else:
self.data += value
self._updated = True


@dataclass
class NoAggregator(Aggregator):
"""
Class for cases when no aggregation is required.
Sets the data to the first value and does not update it.
"""

def update(self, value: Union[float, ArrayLike], **kwargs):
"""Update the state of the aggregator with new data."""
# set value only on first update
if not self.updated:
logging.debug("Setting data to %s", value)
self.data = value
self._updated = True
2 changes: 1 addition & 1 deletion pymchelper/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,5 @@ def average_with_nan(estimator_list, error_estimate=ErrorEstimate.stderr):
page.error_raw /= np.sqrt(result.file_counter) # np.sqrt() always returns np.float64
else:
for page in result.pages:
page.error_raw = np.zeros_like(page.data_raw)
page.error_raw = None
return result
12 changes: 9 additions & 3 deletions pymchelper/executor/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from pymchelper.executor.options import SimulationSettings

from pymchelper.simulator_type import SimulatorType
from pymchelper.input_output import frompattern, get_topas_estimators
from pymchelper.readers.topas import get_topas_estimators
from pymchelper.input_output import frompattern


class OutputDataType(IntEnum):
Expand All @@ -30,8 +31,12 @@ class Runner:
Main class responsible for configuring and starting multiple parallel MC simulation processes
It can be used to access combined averaged results of the simulation.
"""
def __init__(self, settings: SimulationSettings, jobs: int=None,
keep_workspace_after_run: bool=False, output_directory: str='.'):

def __init__(self,
settings: SimulationSettings,
jobs: int = None,
keep_workspace_after_run: bool = False,
output_directory: str = '.'):
self.settings = settings

# create pool of processes, waiting to be started by run method
Expand Down Expand Up @@ -234,6 +239,7 @@ class WorkspaceManager:
A workspace consists of multiple working directories (i.e. run_1, run_2),
each per one of the parallel simulation run.
"""

def __init__(self, output_directory='.', keep_workspace_after_run=False):
self.output_dir_absolute_path = os.path.abspath(output_directory)
self.keep_workspace_after_run = keep_workspace_after_run
Expand Down
Loading