Skip to content

Commit

Permalink
Using multiple anomaly detection models (#384)
Browse files Browse the repository at this point in the history
* Use of multiple models

* Specifying a single model

* flake8

* wrapper_fix

* add lit

* pep8

* pep8

* arg fix

* fix

* docstring update

* get key fix

* add models

* model[0] -> model.values[0]

* test->96

* Update tests

* remove profile

* test fix
  • Loading branch information
Knispel2 authored Jul 29, 2024
1 parent 5910070 commit 96dc90e
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 32 deletions.
76 changes: 44 additions & 32 deletions fink_science/anomaly_detection/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from line_profiler import profile

import logging
import os
import zipfile
Expand Down Expand Up @@ -46,20 +44,24 @@ def __init__(self, forest_g, forest_r) -> None:
self.forest_r = forest_r
self.forest_g = forest_g

def anomaly_score(self, data_g, data_r):
def anomaly_score(self, data_r, data_g):
scores_g = self.forest_g.run(None, {"X": data_g.values.astype(np.float32)})
scores_r = self.forest_r.run(None, {"X": data_r.values.astype(np.float32)})
return (scores_g[-1] + scores_r[-1]) / 2


@pandas_udf(DoubleType())
@profile
def anomaly_score(lc_features, model_type="AADForest"):
def anomaly_score(lc_features, model=None):
"""Returns anomaly score for an observation
Parameters
----------
lc_features: Spark Map
Dict of dicts of floats. Keys of first dict - filters (fid), keys of inner dicts - names of features.
model: str
Name of the model used.
Name must start with a ‘_’ and be ‘_{user_name}’,
where user_name is the user name of the model at https://anomaly.fink-portal.org/.
Returns
----------
Expand All @@ -76,20 +78,23 @@ def anomaly_score(lc_features, model_type="AADForest"):
# Required alert columns, concatenated with historical data
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid', 'distnr', 'magnr', 'sigmagnr', 'isdiffpos']
>>> MODELS = ['', '_beta'] # '' corresponds to the model for a telegram channel
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)
>>> cols = ['cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId', 'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos']
>>> df = df.withColumn('lc_features', extract_features_ad(*cols))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
>>> for model in MODELS:
... df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))
>>> df.filter(df["anomaly_score"] < -0.013).count()
108
96
>>> df.filter(df["anomaly_score"] == 0).count()
84
>>> df.filter(df["anomaly_score"] == 0).count() < 200
True
# Check the robustness of the code when i-band is present
>>> df = spark.read.load(ztf_alert_with_i_band)
Expand All @@ -109,29 +114,6 @@ def anomaly_score(lc_features, model_type="AADForest"):
121
"""

path = os.path.dirname(os.path.abspath(__file__))
model_path = f"{path}/data/models/anomaly_detection"
g_model_path_AAD = f"{model_path}/forest_g_AAD.onnx"
r_model_path_AAD = f"{model_path}/forest_r_AAD.onnx"
if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)):
# unzip in a tmp place
tmp_path = '/tmp'
g_model_path_AAD = f"{tmp_path}/forest_g_AAD.onnx"
r_model_path_AAD = f"{tmp_path}/forest_r_AAD.onnx"
# check it does not exist to avoid concurrent write
if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)):
with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD.zip", 'r') as zip_ref:
zip_ref.extractall(tmp_path)

forest_r_AAD = rt.InferenceSession(r_model_path_AAD)
forest_g_AAD = rt.InferenceSession(g_model_path_AAD)

# load the mean values used to replace Nan values from the features extraction
r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)

model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD)

def get_key(x, band):
if (
len(x) != 2 or x is None or any(
Expand All @@ -146,19 +128,49 @@ def get_key(x, band):
else:
raise IndexError("band {} not found in {}".format(band, x))

path = os.path.dirname(os.path.abspath(__file__))
model_path = f"{path}/data/models/anomaly_detection"

r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)
data_r = lc_features.apply(lambda x: get_key(x, 1))[MODEL_COLUMNS]
data_g = lc_features.apply(lambda x: get_key(x, 2))[MODEL_COLUMNS]

mask_r = data_r.isnull().all(1)
mask_g = data_g.isnull().all(1)
mask = mask_r.values * mask_g.values
if model is not None:
model = model.values[0]
else:
model = ''

for col in data_r.columns[data_r.isna().any()]:
data_r[col].fillna(r_means[col], inplace=True)

for col in data_g.columns[data_g.isna().any()]:
data_g[col].fillna(g_means[col], inplace=True)

g_model_path_AAD = f"{model_path}/forest_g_AAD{model}.onnx"
r_model_path_AAD = f"{model_path}/forest_r_AAD{model}.onnx"
if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)):
# unzip in a tmp place
tmp_path = '/tmp'
g_model_path_AAD = f"{tmp_path}/forest_g_AAD{model}.onnx"
r_model_path_AAD = f"{tmp_path}/forest_r_AAD{model}.onnx"
# check it does not exist to avoid concurrent write
if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)):
with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD{model}.zip", 'r') as zip_ref:
zip_ref.extractall(tmp_path)

forest_r_AAD = rt.InferenceSession(r_model_path_AAD)
forest_g_AAD = rt.InferenceSession(g_model_path_AAD)

# load the mean values used to replace Nan values from the features extraction
r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)

model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD)

score = model_AAD.anomaly_score(data_r, data_g)
score_ = np.transpose(score)[0]
score_[mask] = 0.0
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 96dc90e

Please sign in to comment.