Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using multiple anomaly detection models #384

Merged
merged 19 commits into from
Jul 29, 2024
186 changes: 95 additions & 91 deletions fink_science/anomaly_detection/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,103 +44,107 @@ def __init__(self, forest_g, forest_r) -> None:
self.forest_r = forest_r
self.forest_g = forest_g

def anomaly_score(self, data_g, data_r):
def anomaly_score(self, data_r, data_g):
scores_g = self.forest_g.run(None, {"X": data_g.values.astype(np.float32)})
scores_r = self.forest_r.run(None, {"X": data_r.values.astype(np.float32)})
return (scores_g[-1] + scores_r[-1]) / 2

@pandas_udf(DoubleType())
def anomaly_score(lc_features, model_type="AADForest"):
"""Returns anomaly score for an observation

Parameters
----------
lc_features: Spark Map
Dict of dicts of floats. Keys of first dict - filters (fid), keys of inner dicts - names of features.

Returns
----------
out: float
Anomaly score

Examples
---------
>>> from fink_utils.spark.utils import concat_col
>>> from pyspark.sql import functions as F
>>> from fink_science.ad_features.processor import extract_features_ad

>>> df = spark.read.load(ztf_alert_sample)

# Required alert columns, concatenated with historical data
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid', 'distnr', 'magnr', 'sigmagnr', 'isdiffpos']
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)

>>> cols = ['cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId', 'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos']
>>> df = df.withColumn('lc_features', extract_features_ad(*cols))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))

>>> df.filter(df["anomaly_score"] < -0.013).count()
108

>>> df.filter(df["anomaly_score"] == 0).count()
84
"""

path = os.path.dirname(os.path.abspath(__file__))
model_path = f"{path}/data/models/anomaly_detection"
g_model_path_AAD = f"{model_path}/forest_g_AAD.onnx"
r_model_path_AAD = f"{model_path}/forest_r_AAD.onnx"
if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)):
# unzip in a tmp place
tmp_path = '/tmp'
g_model_path_AAD = f"{tmp_path}/forest_g_AAD.onnx"
r_model_path_AAD = f"{tmp_path}/forest_r_AAD.onnx"
# check it does not exist to avoid concurrent write
if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)):
with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD.zip", 'r') as zip_ref:
zip_ref.extractall(tmp_path)

forest_r_AAD = rt.InferenceSession(r_model_path_AAD)
forest_g_AAD = rt.InferenceSession(g_model_path_AAD)

# load the mean values used to replace Nan values from the features extraction
r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)

model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD)

def get_key(x, band):
if (
len(x) != 2 or x is None or any(
map( # noqa: W503
lambda fs: (fs is None or len(fs) == 0), x.values()
def anomaly_score(lc_features, model=''):
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
@pandas_udf(DoubleType())
def anomaly_score_(lc_features):
"""Returns anomaly score for an observation

Parameters
----------
lc_features: Spark Map
Dict of dicts of floats. Keys of first dict - filters (fid), keys of inner dicts - names of features.

Returns
----------
out: float
Anomaly score

Examples
---------
>>> from fink_utils.spark.utils import concat_col
>>> from pyspark.sql import functions as F
>>> from fink_science.ad_features.processor import extract_features_ad

>>> df = spark.read.load(ztf_alert_sample)

# Required alert columns, concatenated with historical data
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid', 'distnr', 'magnr', 'sigmagnr', 'isdiffpos']
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)

>>> cols = ['cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId', 'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos']
>>> df = df.withColumn('lc_features', extract_features_ad(*cols))
>>> df = df.withColumn(f"anomaly_score", anomaly_score("lc_features"))

>>> df.filter(df["anomaly_score"] < -0.013).count()
108

>>> df.filter(df["anomaly_score"] == 0).count()
84
"""
def get_key(x, band):
if (
len(x) != 2 or x is None or any(
map( # noqa: W503
lambda fs: (fs is None or len(fs) == 0), x.values()
)
)
)
):
return pd.Series({k: np.nan for k in MODEL_COLUMNS}, dtype=np.float64)
else:
return pd.Series(x[band])

data_r = lc_features.apply(lambda x: get_key(x, 1))[MODEL_COLUMNS]
data_g = lc_features.apply(lambda x: get_key(x, 2))[MODEL_COLUMNS]

mask_r = data_r.isnull().all(1)
mask_g = data_g.isnull().all(1)
mask = mask_r.values * mask_g.values

for col in data_r.columns[data_r.isna().any()]:
data_r[col].fillna(r_means[col], inplace=True)

for col in data_g.columns[data_g.isna().any()]:
data_g[col].fillna(g_means[col], inplace=True)

score = model_AAD.anomaly_score(data_r, data_g)
score_ = np.transpose(score)[0]
score_[mask] = 0.0
return pd.Series(score_)
):
return pd.Series({k: np.nan for k in MODEL_COLUMNS}, dtype=np.float64)
else:
return pd.Series(x[band])
path = os.path.dirname(os.path.abspath(__file__))
model_path = f"{path}/data/models/anomaly_detection"

r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)
data_r = lc_features.apply(lambda x: get_key(x, 1))[MODEL_COLUMNS]
data_g = lc_features.apply(lambda x: get_key(x, 2))[MODEL_COLUMNS]

mask_r = data_r.isnull().all(1)
mask_g = data_g.isnull().all(1)
mask = mask_r.values * mask_g.values

for col in data_r.columns[data_r.isna().any()]:
data_r[col].fillna(r_means[col], inplace=True)

for col in data_g.columns[data_g.isna().any()]:
data_g[col].fillna(g_means[col], inplace=True)

g_model_path_AAD = f"{model_path}/forest_g_AAD{model}.onnx"
r_model_path_AAD = f"{model_path}/forest_r_AAD{model}.onnx"
if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)):
# unzip in a tmp place
tmp_path = '/tmp'
g_model_path_AAD = f"{tmp_path}/forest_g_AAD{model}.onnx"
r_model_path_AAD = f"{tmp_path}/forest_r_AAD{model}.onnx"
# check it does not exist to avoid concurrent write
if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)):
with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD{model}.zip", 'r') as zip_ref:
zip_ref.extractall(tmp_path)

forest_r_AAD = rt.InferenceSession(r_model_path_AAD)
forest_g_AAD = rt.InferenceSession(g_model_path_AAD)

# load the mean values used to replace Nan values from the features extraction
r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)

model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD)

score = model_AAD.anomaly_score(data_r, data_g)
score_ = np.transpose(score)[0]
score_[mask] = 0.0
return pd.Series(score_)
return anomaly_score_(lc_features)


if __name__ == "__main__":
Expand Down
Loading