diff --git a/fink_science/anomaly_detection/processor.py b/fink_science/anomaly_detection/processor.py index e039ff6f..09b60fc4 100644 --- a/fink_science/anomaly_detection/processor.py +++ b/fink_science/anomaly_detection/processor.py @@ -12,8 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from line_profiler import profile - import logging import os import zipfile @@ -46,20 +44,24 @@ def __init__(self, forest_g, forest_r) -> None: self.forest_r = forest_r self.forest_g = forest_g - def anomaly_score(self, data_g, data_r): + def anomaly_score(self, data_r, data_g): scores_g = self.forest_g.run(None, {"X": data_g.values.astype(np.float32)}) scores_r = self.forest_r.run(None, {"X": data_r.values.astype(np.float32)}) return (scores_g[-1] + scores_r[-1]) / 2 + @pandas_udf(DoubleType()) -@profile -def anomaly_score(lc_features, model_type="AADForest"): +def anomaly_score(lc_features, model=None): """Returns anomaly score for an observation Parameters ---------- lc_features: Spark Map Dict of dicts of floats. Keys of first dict - filters (fid), keys of inner dicts - names of features. + model: str + Name of the model used. + Name must start with a ‘_’ and be ‘_{user_name}’, + where user_name is the user name of the model at https://anomaly.fink-portal.org/. Returns ---------- @@ -76,6 +78,8 @@ def anomaly_score(lc_features, model_type="AADForest"): # Required alert columns, concatenated with historical data >>> what = ['magpsf', 'jd', 'sigmapsf', 'fid', 'distnr', 'magnr', 'sigmagnr', 'isdiffpos'] + + >>> MODELS = ['', '_beta'] # '' corresponds to the model for a telegram channel >>> prefix = 'c' >>> what_prefix = [prefix + i for i in what] >>> for colname in what: @@ -83,13 +87,14 @@ def anomaly_score(lc_features, model_type="AADForest"): >>> cols = ['cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId', 'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos'] >>> df = df.withColumn('lc_features', extract_features_ad(*cols)) - >>> df = df.withColumn("anomaly_score", anomaly_score("lc_features")) + >>> for model in MODELS: + ... df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model))) >>> df.filter(df["anomaly_score"] < -0.013).count() - 108 + 96 - >>> df.filter(df["anomaly_score"] == 0).count() - 84 + >>> df.filter(df["anomaly_score"] == 0).count() < 200 + True # Check the robustness of the code when i-band is present >>> df = spark.read.load(ztf_alert_with_i_band) @@ -109,29 +114,6 @@ def anomaly_score(lc_features, model_type="AADForest"): 121 """ - path = os.path.dirname(os.path.abspath(__file__)) - model_path = f"{path}/data/models/anomaly_detection" - g_model_path_AAD = f"{model_path}/forest_g_AAD.onnx" - r_model_path_AAD = f"{model_path}/forest_r_AAD.onnx" - if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)): - # unzip in a tmp place - tmp_path = '/tmp' - g_model_path_AAD = f"{tmp_path}/forest_g_AAD.onnx" - r_model_path_AAD = f"{tmp_path}/forest_r_AAD.onnx" - # check it does not exist to avoid concurrent write - if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)): - with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD.zip", 'r') as zip_ref: - zip_ref.extractall(tmp_path) - - forest_r_AAD = rt.InferenceSession(r_model_path_AAD) - forest_g_AAD = rt.InferenceSession(g_model_path_AAD) - - # load the mean values used to replace Nan values from the features extraction - r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True) - g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True) - - model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD) - def get_key(x, band): if ( len(x) != 2 or x is None or any( @@ -146,12 +128,21 @@ def get_key(x, band): else: raise IndexError("band {} not found in {}".format(band, x)) + path = os.path.dirname(os.path.abspath(__file__)) + model_path = f"{path}/data/models/anomaly_detection" + + r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True) + g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True) data_r = lc_features.apply(lambda x: get_key(x, 1))[MODEL_COLUMNS] data_g = lc_features.apply(lambda x: get_key(x, 2))[MODEL_COLUMNS] mask_r = data_r.isnull().all(1) mask_g = data_g.isnull().all(1) mask = mask_r.values * mask_g.values + if model is not None: + model = model.values[0] + else: + model = '' for col in data_r.columns[data_r.isna().any()]: data_r[col].fillna(r_means[col], inplace=True) @@ -159,6 +150,27 @@ def get_key(x, band): for col in data_g.columns[data_g.isna().any()]: data_g[col].fillna(g_means[col], inplace=True) + g_model_path_AAD = f"{model_path}/forest_g_AAD{model}.onnx" + r_model_path_AAD = f"{model_path}/forest_r_AAD{model}.onnx" + if not (os.path.exists(r_model_path_AAD) and os.path.exists(g_model_path_AAD)): + # unzip in a tmp place + tmp_path = '/tmp' + g_model_path_AAD = f"{tmp_path}/forest_g_AAD{model}.onnx" + r_model_path_AAD = f"{tmp_path}/forest_r_AAD{model}.onnx" + # check it does not exist to avoid concurrent write + if not (os.path.exists(g_model_path_AAD) and os.path.exists(r_model_path_AAD)): + with zipfile.ZipFile(f"{model_path}/anomaly_detection_forest_AAD{model}.zip", 'r') as zip_ref: + zip_ref.extractall(tmp_path) + + forest_r_AAD = rt.InferenceSession(r_model_path_AAD) + forest_g_AAD = rt.InferenceSession(g_model_path_AAD) + + # load the mean values used to replace Nan values from the features extraction + r_means = pd.read_csv(f"{model_path}/r_means.csv", header=None, index_col=0, squeeze=True) + g_means = pd.read_csv(f"{model_path}/g_means.csv", header=None, index_col=0, squeeze=True) + + model_AAD = TwoBandModel(forest_g_AAD, forest_r_AAD) + score = model_AAD.anomaly_score(data_r, data_g) score_ = np.transpose(score)[0] score_[mask] = 0.0 diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest.zip deleted file mode 100644 index fa6f562a..00000000 Binary files a/fink_science/data/models/anomaly_detection/anomaly_detection_forest.zip and /dev/null differ diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_alpha.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_alpha.zip new file mode 100644 index 00000000..83b9a322 Binary files /dev/null and b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_alpha.zip differ diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_beta.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_beta.zip new file mode 100644 index 00000000..ec7902f7 Binary files /dev/null and b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_beta.zip differ diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_delta.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_delta.zip new file mode 100644 index 00000000..d518e127 Binary files /dev/null and b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_delta.zip differ diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_epsilon.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_epsilon.zip new file mode 100644 index 00000000..588923fb Binary files /dev/null and b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_epsilon.zip differ diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_gamma.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_gamma.zip new file mode 100644 index 00000000..24391720 Binary files /dev/null and b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_gamma.zip differ diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_omega.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_omega.zip new file mode 100644 index 00000000..0102e10f Binary files /dev/null and b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_omega.zip differ diff --git a/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_theta.zip b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_theta.zip new file mode 100644 index 00000000..1d569209 Binary files /dev/null and b/fink_science/data/models/anomaly_detection/anomaly_detection_forest_AAD_theta.zip differ