Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using multiple anomaly detection models #384

Merged
merged 19 commits into from
Jul 29, 2024
76 changes: 44 additions & 32 deletions fink_science/anomaly_detection/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from line_profiler import profile

import logging
import os
import zipfile
Expand Down Expand Up @@ -46,20 +44,24 @@ def __init__(self, forest_g, forest_r) -> None:
self.forest_r = forest_r
self.forest_g = forest_g

def anomaly_score(self, data_g, data_r):
def anomaly_score(self, data_r, data_g):
scores_g = self.forest_g.run(None, {"X": data_g.values.astype(np.float32)})
scores_r = self.forest_r.run(None, {"X": data_r.values.astype(np.float32)})
return (scores_g[-1] + scores_r[-1]) / 2


@pandas_udf(DoubleType())
@profile
def anomaly_score(lc_features, model_type="AADForest"):
def anomaly_score(lc_features, model=None):
"""Returns anomaly score for an observation

Parameters
----------
lc_features: Spark Map
Dict of dicts of floats. Keys of first dict - filters (fid), keys of inner dicts - names of features.
model: str
Name of the model used.
Name must start with a ‘_’ and be ‘_{user_name}’,
where user_name is the user name of the model at https://anomaly.fink-portal.org/.

Returns
----------
Expand All @@ -76,20 +78,23 @@ def anomaly_score(lc_features, model_type="AADForest"):

# Required alert columns, concatenated with historical data
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid', 'distnr', 'magnr', 'sigmagnr', 'isdiffpos']

>>> MODELS = ['', '_beta'] # '' corresponds to the model for a telegram channel
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)

>>> cols = ['cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId', 'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos']
>>> df = df.withColumn('lc_features', extract_features_ad(*cols))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
>>> for model in MODELS:
... df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))

>>> df.filter(df["anomaly_score"] < -0.013).count()
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
108
96

>>> df.filter(df["anomaly_score"] == 0).count()
84
>>> df.filter(df["anomaly_score"] == 0).count() < 200
True

# Check the robustness of the code when i-band is present
>>> df = spark.read.load(ztf_alert_with_i_band)
Expand All @@ -109,29 +114,6 @@ def anomaly_score(lc_features, model_type="AADForest"):
121
"""

path = os.path.dirname(os.path.abspath(__file__))
model_path = f"{path}/data/models/anomaly_detection"
g_model_path_AAD = f"{model_path}/forest_g_AAD.onnx"
r_model_path_AAD = f"{model_path}/forest_r_AAD.onnx"
if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)):
# unzip in a tmp place
tmp_path = '/tmp'
g_model_path_AAD = f"{tmp_path}/forest_g_AAD.onnx"
r_model_path_AAD = f"{tmp_path}/forest_r_AAD.onnx"
# check it does not exist to avoid concurrent write
if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)):
with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD.zip", 'r') as zip_ref:
zip_ref.extractall(tmp_path)

forest_r_AAD = rt.InferenceSession(r_model_path_AAD)
forest_g_AAD = rt.InferenceSession(g_model_path_AAD)

# load the mean values used to replace Nan values from the features extraction
r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)

model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD)

def get_key(x, band):
if (
len(x) != 2 or x is None or any(
Expand All @@ -146,19 +128,49 @@ def get_key(x, band):
else:
raise IndexError("band {} not found in {}".format(band, x))

path = os.path.dirname(os.path.abspath(__file__))
model_path = f"{path}/data/models/anomaly_detection"

r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)
data_r = lc_features.apply(lambda x: get_key(x, 1))[MODEL_COLUMNS]
data_g = lc_features.apply(lambda x: get_key(x, 2))[MODEL_COLUMNS]

mask_r = data_r.isnull().all(1)
mask_g = data_g.isnull().all(1)
mask = mask_r.values * mask_g.values
if model is not None:
model = model.values[0]
else:
model = ''
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved

for col in data_r.columns[data_r.isna().any()]:
data_r[col].fillna(r_means[col], inplace=True)

for col in data_g.columns[data_g.isna().any()]:
data_g[col].fillna(g_means[col], inplace=True)

g_model_path_AAD = f"{model_path}/forest_g_AAD{model}.onnx"
r_model_path_AAD = f"{model_path}/forest_r_AAD{model}.onnx"
if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)):
# unzip in a tmp place
tmp_path = '/tmp'
g_model_path_AAD = f"{tmp_path}/forest_g_AAD{model}.onnx"
r_model_path_AAD = f"{tmp_path}/forest_r_AAD{model}.onnx"
# check it does not exist to avoid concurrent write
if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)):
with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD{model}.zip", 'r') as zip_ref:
zip_ref.extractall(tmp_path)

forest_r_AAD = rt.InferenceSession(r_model_path_AAD)
forest_g_AAD = rt.InferenceSession(g_model_path_AAD)

# load the mean values used to replace Nan values from the features extraction
r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True)
g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True)

model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD)

score = model_AAD.anomaly_score(data_r, data_g)
score_ = np.transpose(score)[0]
score_[mask] = 0.0
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading