-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchange_point_detection.py
183 lines (137 loc) · 6.23 KB
/
change_point_detection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
import math
import pandas as pd
from collections import deque
import copy
import random
import offline_bayesian as bay_cpd
from functools import partial
# - start generator code - #
class Generator(object):
def __init__(self):
self._changepoint = -1
def get(self):
self._changepoint += 1
return 1.0
class DistributionGenerator(Generator):
"""
A generator which generates values from a single distribution. This may not immediately
appear useful for change detection, however if we can model our unchanged data stream
as a distribution then we can test against false positives by running tests against
a single distribution.
dist1: A scipy.stats distribution for before changepoint.
kwargs: The keyword arguments are passed to the distribution.
"""
def __init__(self, dist, **kwargs):
self._dist = dist
self._args = kwargs
self._changepoint = 0
def get(self):
return self._dist.rvs(**self._args)
class ChangingDistributionGenerator(Generator):
"""
A generator which takes two distinct distributions and a changepoint and returns
random variates from the first distribution until it has reached the changepoint
when it then switches to the next.
dist1: A scipy.stats distribution for before changepoint.
kwargs1: A map specifying loc and scale for dist1.
dist2: A scipy.stats distribution for after changepoint.
kwargs2: A map specifying loc and scale for dist2.
changepoint: The number of values to be generated before switching to dist2.
"""
_position = 0
def __init__(self, dist1, kwargs1, dist2, kwargs2, changepoint):
self._dist1 = dist1
self._kwargs1 = kwargs1
self._dist2 = dist2
self._kwargs2 = kwargs2
self._changepoint = changepoint
def get(self):
self._position += 1
if self._position <= self._changepoint:
return self._dist1.rvs(**self._kwargs1)
else:
return self._dist2.rvs(**self._kwargs2)
class DriftGenerator(Generator):
"""
A generator which takes two distinct distributions and a changepoint and returns
random variates from the first distribution until it has reached the changepoint
when it then drifts to the next.
dist1: A scipy.stats distribution for before changepoint.
kwargs1: A map specifying loc and scale for dist1.
dist2: A scipy.stats distribution for after changepoint.
kwargs2: A map specifying loc and scale for dist2.
changepoint: The number of values to be generated before switching to dist2.
steps: The number of time steps to spend drifting to dist2.
"""
_position = 0
def __init__(self, dist1, kwargs1, dist2, kwargs2, changepoint, steps):
self._dist1 = dist1
self._kwargs1 = kwargs1
self._dist2 = dist2
self._kwargs2 = kwargs2
self._changepoint = changepoint
self._steps = steps
self._change_gradient = np.linspace(0, 1, self._steps)
def get(self):
self._position += 1
if self._position < self._changepoint:
return self._dist1.rvs(**self._kwargs1)
if self._position >= self._changepoint and self._position < self._changepoint + self._steps:
beta = self._change_gradient[self._position - self._changepoint - 1]
return ((1 - beta) * self._dist1.rvs(**self._kwargs1)) + (beta * self._dist2.rvs(**self._kwargs2))
else:
return self._dist2.rvs(**self._kwargs2)
class DataBackedGenerator(Generator):
"""
A generator which takes a vector of values and behaves similarly
to the other generators here. Returns None if values are requested
past the end of the supplied vector.
vec: The vector of values for this generator to produce.
changepoint: The index at which the change occurs.
"""
_idx = 0
def __init__(self, vec, changepoint):
self._vec = vec
self._changepoint = changepoint
def get(self):
if self._idx < len(self._vec):
self._idx += 1
return self._vec[self._idx - 1]
# - end generator code - #
# make different distributions with randomized parameters for the mean (loc) and standard deviation (scale)
dist_length = 100
distributions = []
def gen_values(gen):
vals = np.zeros(dist_length)
for x in range(dist_length):
vals[x] = gen.get()
cps = []
cps.append(gen._changepoint)
distributions.append((vals,cps))
# loc = mean of distribution
# scale = standard deviation
gen_values(DistributionGenerator(stats.norm, **{'loc': random.randint(5,25), 'scale': random.randint(1,4)}))
gen_values(ChangingDistributionGenerator(stats.norm, {'loc': random.randint(5,25), 'scale': random.randint(1,4)},stats.norm, {'loc': random.randint(5,25), 'scale': random.randint(1,4)}, random.randint(30,70)))
gen_values(DriftGenerator(stats.norm, {'loc': random.randint(5,25), 'scale': random.randint(1,4)},stats.norm, {'loc': random.randint(5,25), 'scale': random.randint(1,4)}, random.randint(30,70), 5))
multi_dist = [distributions[1][0], distributions[2][0]]
multi_cps = [distributions[1][1][0], 100, dist_length + distributions[2][1][0]]
join_dist = np.hstack(multi_dist)
distributions.append((join_dist, multi_cps))
# TODO: fix code so it works with multiple changepoints
# TODO: iterate through existing distributions and try different CPD methods on each one, using f1 score as metric
def eval_f1_score():
return None
for data in distributions:
ll_1, ll_2, cp_prob = bay_cpd.offline_changepoint_detection(data[0], partial(bay_cpd.const_prior, l=(len(data[0])+1)), bay_cpd.gaussian_obs_log_likelihood, truncate=-40)
plt.plot(data[0])
guesses = np.exp(cp_prob).sum(0)
#guesses = np.argwhere(guesses > 0.1)
for cp in data[1]:
plt.axvline(x = cp,**{'color': 'red'})
for i in range(len(guesses)):
plt.axvline(x = i, **{'color': 'green'}, alpha = guesses[i])
plt.title('Bayesian Changepoint Detection (engr-stocks)', fontdict = None)
plt.show()