Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add class to compute statistics on the fly #75

Merged
merged 5 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions src/nectarchain/tests/test_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import pytest

def test_stats_simple():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats()
s.add(1)
s.add(2)
s.add(3)

np.testing.assert_allclose( s.mean, np.array([2.]) )
np.testing.assert_allclose( s.std, np.array([1.]) )
np.testing.assert_allclose( s.max, np.array([3.]))
np.testing.assert_allclose( s.min, np.array([1.]))
np.testing.assert_allclose( s.count, np.array([3]))

def test_stats_camera():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )
s.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s.add( np.array([2,3,4,5,6]), validmask = None )
s.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )

def test_stats_copy():
import numpy as np
from nectarchain.utils.stats import Stats, CameraStats, CameraSampleStats

a = Stats()
assert id(a) != id(a.copy())

a = CameraStats()
assert id(a) != id(a.copy())

a = CameraSampleStats()
assert id(a) != id(a.copy())

def test_stats_lowcounts():

import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [True,True,True,True,False] ))
s.add( np.array([1,1,1,1,1]))
s.add( np.array([2,2,2,2,2]))

np.testing.assert_array_equal( s.get_lowcount_mask(3), np.array([False,False,False,False,True]) )


def test_stats_merge():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )

s2 = Stats( shape = (5) )
s2.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s2.add( np.array([2,3,4,5,6]), validmask = None )
s2.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

s.merge(s2)

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )


def test_stats_merge2():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )

s2 = Stats( shape = (5) )
s2.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s2.add( np.array([2,3,4,5,6]), validmask = None )
s2.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

s += s2

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )


def test_stats_merge3():
import numpy as np
from nectarchain.utils.stats import Stats

s1 = Stats( shape = (5) )
s1.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s1.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )

s2 = Stats( shape = (5) )
s2.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s2.add( np.array([2,3,4,5,6]), validmask = None )
s2.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

s = s1 + s2

assert id(s) != id(s1)
assert id(s) != id(s2)

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )

def test_stats_shape():
import numpy as np
from nectarchain.utils.stats import Stats, CameraStats, CameraSampleStats
from ctapipe_io_nectarcam import constants as nc

s = Stats()
assert s.shape == (1,)

s = CameraStats()
assert s.shape == (nc.N_GAINS,nc.N_PIXELS)

s = CameraSampleStats()
assert s.shape == (nc.N_GAINS,nc.N_PIXELS,nc.N_SAMPLES)


def test_stats_print():
import numpy as np
from nectarchain.utils.stats import Stats, CameraStats, CameraSampleStats

s = Stats()
s.add(1)
s.add(2)
s.add(3)


assert s.__str__() == 'mean: [2.]\nstd: [1.]\nmin: [1.]\nmax: [3.]\ncount: [3]\nshape: (1,)'
assert s.__repr__() == s.__str__()

def test_stats_badmerge():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats()
s.add(1)

s2 = Stats(5)
s2.add([1,2,3,4,5])


with pytest.raises(ValueError, match="Trying to merge from a different shape this:.*"):
s.merge(s2)
Empty file.
206 changes: 206 additions & 0 deletions src/nectarchain/utils/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Code that implement Welford's algorithm for stats computation

This is inspired by the implementation done at https://github.com/a-mitani/welford
jlenain marked this conversation as resolved.
Show resolved Hide resolved
"""
import numpy as np
import warnings
from copy import deepcopy

from ctapipe_io_nectarcam import constants as nc

class Stats:
"""class Stats
Accumulator object for Welfords online / parallel variance algorithm.


Examples
--------
Example with only one variable
>>> from nectarchain.utils.stats import Stats
>>> s = Stats()
>>> s.add(1)
>>> s.add(2)
>>> s.add(3)
>>> s.mean
2
>>> s.std
1
"""

def __init__(self,shape=(1,)):
"""__init__

Initialize with an optional data.
For the calculation efficiency, Welford's method is not used on the initialization process.

"""
# Initialize instance attributes
self._shape = shape
self._count = np.zeros( shape, dtype=int )
self._m = np.zeros( shape, dtype=float )
self._s = np.zeros( shape, dtype=float )
self._min = np.full( shape, np.inf )
self._max = np.full( shape, -np.inf )

def __str__(self):
infos = ""
infos += f"mean: {self.mean}" + "\n"
infos += f"std: {self.stddev}" + "\n"
infos += f"min: {self.min}" + "\n"
infos += f"max: {self.max}" + "\n"
infos += f"count: {self.count}" + "\n"
infos += f"shape: {self.shape}"
return infos

def __repr__(self):
return self.__str__()

def copy(self):
return deepcopy(self)

def __add__(self, other):
r = self.copy()
r.merge(other)
return r

def __iadd__(self,other):
self.merge(other)
return self

@property
def shape(self):
return self._shape

@property
def count(self):
return self._count

@property
def mean(self):
return self._m

@property
def variance(self):
return self._getvars(ddof=1)

@property
def stddev(self):
return np.sqrt(self._getvars(ddof=1))

@property
def std(self):
return self.stddev

@property
def min(self):
return self._min

@property
def max(self):
return self._max

def get_lowcount_mask(self,mincount=3):
return self._count<mincount


def add(self, element, validmask = None):
"""
Add entry. If mask is given, it will only update the entry from mask
element

Parameters
----------
element : np.array
array of element to added to the stat object (must be similar shape as the Stats object)
validmask : np.array
array that indicate which value to use. Only element entry where validmask is True will be added. It must be a boolean array of the same shape as element

"""

# Welford's algorithm
if validmask is None:
self._count += 1
delta = element - self._m
self._m += delta / self._count
self._s += delta * (element - self._m)
self._min = np.minimum( self._min, element )
self._max = np.maximum( self._max, element )
else:
self._count[validmask] += 1
delta = element[validmask] - self._m[validmask]
self._m[validmask] += delta / self._count[validmask]
self._s[validmask] += delta * (element[validmask] - self._m[validmask])
self._min[validmask] = np.minimum( self._min, element )[validmask]
self._max[validmask] = np.maximum( self._max, element )[validmask]


def merge(self, other):
"""Merge this accumulator with another one.

Parameters
----------
other: nectarchain.utils.stats.Stats
Another object of the same type which you want to combined the statistics
"""
if self._shape != other._shape:
raise ValueError(f"Trying to merge from a different shape this: {self._shape}, given: {other._shape}")

#with warnings.catch_warnings():
count = self._count + other._count
delta = self._m - other._m
delta2 = delta * delta
mean = (self._count * self._m + other._count * other._m) / count
s = self._s + other._s + delta2 * (self._count * other._count) / count

self._count = count
self._m = mean
self._s = s

self._min = np.minimum( self._min, other._min )
self._max = np.maximum( self._max, other._max )


def _getvars(self, ddof):
#with warnings.catch_warnings():
# warnings.simplefilter("ignore", category=RuntimeWarning)
variance = self._s / (self._count - ddof)
variance[ self.get_lowcount_mask(1+ddof) ] = np.nan
return variance


class CameraStats(Stats):
"""class CameraStats
Accumulator object for Welfords online / parallel variance algorithm, specialized for camera info
"""

def __init__(self,shape=(nc.N_GAINS,nc.N_PIXELS), *args, **kwargs):
super().__init__(shape,*args,**kwargs)



class CameraSampleStats(Stats):
"""class CameraSampleStats
Accumulator object for Welfords online / parallel variance algorithm, specialized for trace info

Examples
--------
Cumulating the rawdata from a run to get the average waveform
>>> from nectarchain.utils.stats import CameraSampleStats
>>> from ctapipe_io_nectarcam import NectarCAMEventSource

>>> reader = NectarCAMEventSource(input_url='NectarCAM.Run4560.00??.fits.fz')

>>> s = CameraSampleStats()
>>> for event in reader:
>>> s.add( event.r0.tel[0].waveform, validmask=~evt.mon.tel[0].pixel_status.hardware_failing_pixels )
>>> print(s.mean)
"""

def __init__(self,shape=(nc.N_GAINS,nc.N_PIXELS,nc.N_SAMPLES), *args, **kwargs):
super().__init__(shape,*args,**kwargs)