Skip to content

Commit

Permalink
add class to compute statistics on the fly (#75)
Browse files Browse the repository at this point in the history
* Add class to compute statistics on the fly
* Correct minor issue in print function
* Improved coverage of tests
* Change the hard coded camera constant (number of channel, pixels, samples) to use the one from ctapipe_io_nectarcam.constants.py
  • Loading branch information
vmarandon authored Sep 12, 2023
1 parent 0c65774 commit 39b677b
Show file tree
Hide file tree
Showing 3 changed files with 375 additions and 0 deletions.
169 changes: 169 additions & 0 deletions src/nectarchain/tests/test_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import pytest

def test_stats_simple():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats()
s.add(1)
s.add(2)
s.add(3)

np.testing.assert_allclose( s.mean, np.array([2.]) )
np.testing.assert_allclose( s.std, np.array([1.]) )
np.testing.assert_allclose( s.max, np.array([3.]))
np.testing.assert_allclose( s.min, np.array([1.]))
np.testing.assert_allclose( s.count, np.array([3]))

def test_stats_camera():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )
s.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s.add( np.array([2,3,4,5,6]), validmask = None )
s.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )

def test_stats_copy():
import numpy as np
from nectarchain.utils.stats import Stats, CameraStats, CameraSampleStats

a = Stats()
assert id(a) != id(a.copy())

a = CameraStats()
assert id(a) != id(a.copy())

a = CameraSampleStats()
assert id(a) != id(a.copy())

def test_stats_lowcounts():

import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [True,True,True,True,False] ))
s.add( np.array([1,1,1,1,1]))
s.add( np.array([2,2,2,2,2]))

np.testing.assert_array_equal( s.get_lowcount_mask(3), np.array([False,False,False,False,True]) )


def test_stats_merge():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )

s2 = Stats( shape = (5) )
s2.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s2.add( np.array([2,3,4,5,6]), validmask = None )
s2.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

s.merge(s2)

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )


def test_stats_merge2():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats( shape = (5) )
s.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )

s2 = Stats( shape = (5) )
s2.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s2.add( np.array([2,3,4,5,6]), validmask = None )
s2.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

s += s2

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )


def test_stats_merge3():
import numpy as np
from nectarchain.utils.stats import Stats

s1 = Stats( shape = (5) )
s1.add( np.array([0,0,0,0,0]), validmask = np.array( [False,False,False,False,False] ) )
s1.add( np.array([0,1,2,3,4]), validmask = np.array( [True,True,True,True,True] ) )

s2 = Stats( shape = (5) )
s2.add( np.array([1,2,3,4,5]), validmask = np.array( [True,True,True,False,False] ) )
s2.add( np.array([2,3,4,5,6]), validmask = None )
s2.add( np.array([3,4,5,6,7]), validmask = np.array( [False,False,False,False,False] ) )

s = s1 + s2

assert id(s) != id(s1)
assert id(s) != id(s2)

np.testing.assert_allclose( s.count, np.array([3,3,3,2,2]) )
np.testing.assert_allclose( s.mean, np.array([1,2,3,4,5]) )
np.testing.assert_allclose( s.variance, np.array([1,1,1,2,2]) )
np.testing.assert_allclose( s.min, np.array([0,1,2,3,4]) )
np.testing.assert_allclose( s.max, np.array([2,3,4,5,6]) )

def test_stats_shape():
import numpy as np
from nectarchain.utils.stats import Stats, CameraStats, CameraSampleStats
from ctapipe_io_nectarcam import constants as nc

s = Stats()
assert s.shape == (1,)

s = CameraStats()
assert s.shape == (nc.N_GAINS,nc.N_PIXELS)

s = CameraSampleStats()
assert s.shape == (nc.N_GAINS,nc.N_PIXELS,nc.N_SAMPLES)


def test_stats_print():
import numpy as np
from nectarchain.utils.stats import Stats, CameraStats, CameraSampleStats

s = Stats()
s.add(1)
s.add(2)
s.add(3)


assert s.__str__() == 'mean: [2.]\nstd: [1.]\nmin: [1.]\nmax: [3.]\ncount: [3]\nshape: (1,)'
assert s.__repr__() == s.__str__()

def test_stats_badmerge():
import numpy as np
from nectarchain.utils.stats import Stats

s = Stats()
s.add(1)

s2 = Stats(5)
s2.add([1,2,3,4,5])


with pytest.raises(ValueError, match="Trying to merge from a different shape this:.*"):
s.merge(s2)
Empty file.
206 changes: 206 additions & 0 deletions src/nectarchain/utils/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Code that implement Welford's algorithm for stats computation
This is inspired by the implementation done at https://github.com/a-mitani/welford
"""
import numpy as np
import warnings
from copy import deepcopy

from ctapipe_io_nectarcam import constants as nc

class Stats:
"""class Stats
Accumulator object for Welfords online / parallel variance algorithm.
Examples
--------
Example with only one variable
>>> from nectarchain.utils.stats import Stats
>>> s = Stats()
>>> s.add(1)
>>> s.add(2)
>>> s.add(3)
>>> s.mean
2
>>> s.std
1
"""

def __init__(self,shape=(1,)):
"""__init__
Initialize with an optional data.
For the calculation efficiency, Welford's method is not used on the initialization process.
"""
# Initialize instance attributes
self._shape = shape
self._count = np.zeros( shape, dtype=int )
self._m = np.zeros( shape, dtype=float )
self._s = np.zeros( shape, dtype=float )
self._min = np.full( shape, np.inf )
self._max = np.full( shape, -np.inf )

def __str__(self):
infos = ""
infos += f"mean: {self.mean}" + "\n"
infos += f"std: {self.stddev}" + "\n"
infos += f"min: {self.min}" + "\n"
infos += f"max: {self.max}" + "\n"
infos += f"count: {self.count}" + "\n"
infos += f"shape: {self.shape}"
return infos

def __repr__(self):
return self.__str__()

def copy(self):
return deepcopy(self)

def __add__(self, other):
r = self.copy()
r.merge(other)
return r

def __iadd__(self,other):
self.merge(other)
return self

@property
def shape(self):
return self._shape

@property
def count(self):
return self._count

@property
def mean(self):
return self._m

@property
def variance(self):
return self._getvars(ddof=1)

@property
def stddev(self):
return np.sqrt(self._getvars(ddof=1))

@property
def std(self):
return self.stddev

@property
def min(self):
return self._min

@property
def max(self):
return self._max

def get_lowcount_mask(self,mincount=3):
return self._count<mincount


def add(self, element, validmask = None):
"""
Add entry. If mask is given, it will only update the entry from mask
element
Parameters
----------
element : np.array
array of element to added to the stat object (must be similar shape as the Stats object)
validmask : np.array
array that indicate which value to use. Only element entry where validmask is True will be added. It must be a boolean array of the same shape as element
"""

# Welford's algorithm
if validmask is None:
self._count += 1
delta = element - self._m
self._m += delta / self._count
self._s += delta * (element - self._m)
self._min = np.minimum( self._min, element )
self._max = np.maximum( self._max, element )
else:
self._count[validmask] += 1
delta = element[validmask] - self._m[validmask]
self._m[validmask] += delta / self._count[validmask]
self._s[validmask] += delta * (element[validmask] - self._m[validmask])
self._min[validmask] = np.minimum( self._min, element )[validmask]
self._max[validmask] = np.maximum( self._max, element )[validmask]


def merge(self, other):
"""Merge this accumulator with another one.
Parameters
----------
other: nectarchain.utils.stats.Stats
Another object of the same type which you want to combined the statistics
"""
if self._shape != other._shape:
raise ValueError(f"Trying to merge from a different shape this: {self._shape}, given: {other._shape}")

#with warnings.catch_warnings():
count = self._count + other._count
delta = self._m - other._m
delta2 = delta * delta
mean = (self._count * self._m + other._count * other._m) / count
s = self._s + other._s + delta2 * (self._count * other._count) / count

self._count = count
self._m = mean
self._s = s

self._min = np.minimum( self._min, other._min )
self._max = np.maximum( self._max, other._max )


def _getvars(self, ddof):
#with warnings.catch_warnings():
# warnings.simplefilter("ignore", category=RuntimeWarning)
variance = self._s / (self._count - ddof)
variance[ self.get_lowcount_mask(1+ddof) ] = np.nan
return variance


class CameraStats(Stats):
"""class CameraStats
Accumulator object for Welfords online / parallel variance algorithm, specialized for camera info
"""

def __init__(self,shape=(nc.N_GAINS,nc.N_PIXELS), *args, **kwargs):
super().__init__(shape,*args,**kwargs)



class CameraSampleStats(Stats):
"""class CameraSampleStats
Accumulator object for Welfords online / parallel variance algorithm, specialized for trace info
Examples
--------
Cumulating the rawdata from a run to get the average waveform
>>> from nectarchain.utils.stats import CameraSampleStats
>>> from ctapipe_io_nectarcam import NectarCAMEventSource
>>> reader = NectarCAMEventSource(input_url='NectarCAM.Run4560.00??.fits.fz')
>>> s = CameraSampleStats()
>>> for event in reader:
>>> s.add( event.r0.tel[0].waveform, validmask=~evt.mon.tel[0].pixel_status.hardware_failing_pixels )
>>> print(s.mean)
"""

def __init__(self,shape=(nc.N_GAINS,nc.N_PIXELS,nc.N_SAMPLES), *args, **kwargs):
super().__init__(shape,*args,**kwargs)


0 comments on commit 39b677b

Please sign in to comment.