diff --git a/.github/workflows/run_test.yml b/.github/workflows/run_test.yml index 48284e8c..b5bb6550 100644 --- a/.github/workflows/run_test.yml +++ b/.github/workflows/run_test.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: - container: ["julienpeloton/fink-ci:prod", "julienpeloton/fink-ci:dev"] + container: ["julienpeloton/fink-ci:dev"] container: image: ${{ matrix.container }} @@ -43,9 +43,13 @@ jobs: - name: Run test suites run: | pip uninstall -y actsnfink - pip install git+https://github.com/emilleishida/fink_sn_activelearning.git@bf8d4e263e02d42781642f872f7bc030c24792bc#egg=actsnfink + pip install git+https://github.com/emilleishida/fink_sn_activelearning.git@4f46b3a1e29de45793125452974e71e92c1ea454#egg=actsnfink + pip install onnx==1.12.0 pip install onnxruntime + pip install py4j + pip install light-curve[full] --upgrade + pip install iminuit==2.21.0 ./run_tests.sh curl -s https://codecov.io/bash | bash diff --git a/.travis.yml b/.travis.yml index e31eaf85..d8439db8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -35,6 +35,7 @@ install: - pip install --upgrade pip setuptools wheel - pip install -r requirements.txt - pip install torch==1.6.0+cpu -f https://download.pytorch.org/whl/torch_stable.html + - python3 -mpip install 'light-curve[full]' script: # Execute tests and report coverage diff --git a/fink_science/data/alerts/elasticc_parquet/test_elasticc_earlysnia.parquet b/fink_science/data/alerts/elasticc_parquet/test_elasticc_earlysnia.parquet new file mode 100644 index 00000000..2aa766e6 Binary files /dev/null and b/fink_science/data/alerts/elasticc_parquet/test_elasticc_earlysnia.parquet differ diff --git a/fink_science/data/alerts/test_elasticc_earlysnia.parquet b/fink_science/data/alerts/test_elasticc_earlysnia.parquet new file mode 100644 index 00000000..bcdf56a5 Binary files /dev/null and b/fink_science/data/alerts/test_elasticc_earlysnia.parquet differ diff --git a/fink_science/data/models/elasticc_rainbow_earlyIa.joblib b/fink_science/data/models/elasticc_rainbow_earlyIa.joblib new file mode 100644 index 00000000..6d4a01c1 Binary files /dev/null and b/fink_science/data/models/elasticc_rainbow_earlyIa.joblib differ diff --git a/fink_science/data/models/elasticc_rainbow_earlyIa.pkl b/fink_science/data/models/elasticc_rainbow_earlyIa.pkl new file mode 100644 index 00000000..fd75402a Binary files /dev/null and b/fink_science/data/models/elasticc_rainbow_earlyIa.pkl differ diff --git a/fink_science/random_forest_snia/processor.py b/fink_science/random_forest_snia/processor.py index cc6bcd81..a942de60 100644 --- a/fink_science/random_forest_snia/processor.py +++ b/fink_science/random_forest_snia/processor.py @@ -19,6 +19,7 @@ import numpy as np import os +import pickle from fink_science import __file__ @@ -27,12 +28,20 @@ from fink_utils.xmatch.simbad import return_list_of_eg_host from actsnfink.classifier_sigmoid import get_sigmoid_features_dev -from actsnfink.classifier_sigmoid import get_sigmoid_features_elasticc_perfilter +# from actsnfink.classifier_sigmoid import get_sigmoid_features_elasticc_perfilter +from actsnfink.rainbow import fit_rainbow from actsnfink.classifier_sigmoid import RF_FEATURE_NAMES from fink_science.tester import spark_unit_tests +RAINBOW_FEATURES_NAMES = [ + "amplitude", "rise_time", + "Tmin", "delta_T", "k_sig", + "reduced_chi2", "lc_max" +] + + def apply_selection_cuts_ztf( magpsf: pd.Series, ndethist: pd.Series, cdsxmatch: pd.Series, minpoints: int = 4, maxndethist: int = 20) -> pd.Series: @@ -312,16 +321,97 @@ def extract_features_rf_snia( return pd.Series(concatenated_features) +def extract_features_rainbow( + midPointTai, filterName, cpsFlux, cpsFluxErr, + band_wave_aa={'u': 3671.0, 'g': 4827.0, 'r': 6223.0, 'i': 7546.0, 'z': 8691.0, 'Y': 9712.0}, + with_baseline=False, + min_data_points=7, + low_bound=-10) -> pd.Series: + """ Return the features used by the RF classifier for one alert. + + Features (incl. order) are given by `RAINBOW_FEATURES_NAMES`. + + Parameters + ---------- + midPointTai: np.array of floats + MJD vector for one object + filterName: np.array of str + Filter name vector for one object + cpsFlux, cpsFluxErr: np.array of float + Flux from PSF-fit photometry, and 1-sigma error + band_wave_aa: dict (optional) + Dictionary with effective wavelength for each filter. + Default is for ZTF: {"g": 4770.0, "r": 6231.0, "i": 7625.0} + with_baseline: bool (optional) + Baseline to be considered. Default is False (baseline 0). + min_data_points: int (optional) + Minimum number of data points in all filters. Default is 7. + low_bound: float (optional) + Lower bound of FLUXCAL to consider. Default is -10. + + Returns + ---------- + features: list of floats + Rainbow best-fit parameter values. + + Examples + ---------- + >>> from pyspark.sql.functions import split + >>> from pyspark.sql.types import FloatType + >>> from fink_utils.spark.utils import concat_col + >>> from pyspark.sql import functions as F + + >>> df = spark.read.load(elasticc_alert_sample) + + # Required alert columns + >>> what = ['midPointTai', 'filterName', 'psFlux', 'psFluxErr'] + + # Use for creating temp name + >>> prefix = 'c' + >>> what_prefix = [prefix + i for i in what] + + # Append temp columns with historical + current measurements + >>> for colname in what: + ... df = concat_col( + ... df, colname, prefix=prefix, + ... current='diaSource', history='prvDiaForcedSources') + + >>> pdf = df.select(what_prefix).toPandas() + + # Test no NaNs + >>> for index, alert in pdf.iterrows(): + ... a_feature = extract_features_rainbow(*[np.array(x) for x in alert.values]) + ... assert np.all(~np.isnan(a_feature)) + """ + if len(midPointTai) < min_data_points: + return np.zeros(len(RAINBOW_FEATURES_NAMES), dtype=float) + + features = fit_rainbow( + midPointTai, filterName, cpsFlux, cpsFluxErr, + band_wave_aa=band_wave_aa, + with_baseline=with_baseline, + min_data_points=min_data_points, + list_filters=band_wave_aa.keys(), + low_bound=low_bound + ) + + return features[1:] + @pandas_udf(DoubleType(), PandasUDFType.SCALAR) -def rfscore_sigmoid_elasticc( - midPointTai, filterName, psFlux, psFluxErr, - ra, dec, hostgal_ra, hostgal_dec, hostgal_snsep, - hostgal_zphot, hostgal_zphot_err, +def rfscore_rainbow_elasticc( + midPointTai, filterName, cpsFlux, cpsFluxErr, + snr, + hostgal_snsep, + hostgal_zphot, maxduration=None, - model=None) -> pd.Series: + model=None, + band_wave_aa=pd.Series([{'u': 3671.0, 'g': 4827.0, 'r': 6223.0, 'i': 7546.0, 'z': 8691.0, 'Y': 9712.0}]), + with_baseline=pd.Series([False]), + min_data_points=pd.Series([7]), + low_bound=pd.Series([-10])) -> pd.Series: """ Return the probability of an alert to be a SNe Ia using a Random - Forest Classifier (sigmoid fit) on ELaSTICC alert data. + Forest Classifier (rainbow fit) on ELaSTICC alert data. Parameters ---------- @@ -329,22 +419,34 @@ def rfscore_sigmoid_elasticc( JD times (vectors of floats) filterName: Spark DataFrame Column Filter IDs (vectors of str) - psFlux, psFluxErr: Spark DataFrame Columns - SNANA calibrated flux, and 1-sigma error (vectors of floats) - meta: list - Additional features using metadata from ELaSTICC + cpsFlux, cpsFluxErr: Spark DataFrame Columns + Magnitude from PSF-fit photometry, and 1-sigma error + snr: Spark DataFrame Column + SNR from `diaSource` (float) + hostgal_snsep: Spark DataFrame Column + `hostgal_snsep` from `diaObject` (float) + hostgal_zphot: Spark DataFrame Column + `hostgal_zphot` from `diaObject` (float) maxduration: Spark DataFrame Column - Integer for the maximum duration (in days) of the lightcurve to be - classified. - Default is None, i.e. no maximum duration + Maximum duration in days to consider the object for classification (int). + Default is None, meaning no maximum duration applied. model: Spark DataFrame Column, optional Path to the trained model. Default is None, in which case the default model `data/models/default-model.obj` is loaded. + band_wave_aa: Spark DataFrame Column + Dictionary with effective wavelength for each filter. + Default is for Elasticc. + with_baseline: Spark DataFrame Column + Baseline to be considered (bool). Default is False (baseline 0 in flux space). + min_data_points: Spark DataFrame Column + Minimum number of data points in all filters. Default is 7. + low_bound: Spark DataFrame Column + Lower bound of FLUXCAL to consider (float). Default is -10. Returns ---------- - probabilities: 1D np.array of float - Probability between 0 (non-Ia) and 1 (Ia). + probabilities: Spark DataFrame Column + Probability between 0 (non-Ia) and 1 (Ia) for each alert. Examples ---------- @@ -353,9 +455,6 @@ def rfscore_sigmoid_elasticc( >>> df = spark.read.format('parquet').load(elasticc_alert_sample) - # Assuming random positions - >>> df = df.withColumn('cdsxmatch', F.lit('Unknown')) - # Required alert columns >>> what = ['midPointTai', 'filterName', 'psFlux', 'psFluxErr'] @@ -371,17 +470,15 @@ def rfscore_sigmoid_elasticc( # Perform the fit + classification (default model) >>> args = [F.col(i) for i in what_prefix] - >>> args += [F.col('diaObject.ra'), F.col('diaObject.decl')] - >>> args += [F.col('diaObject.hostgal_ra'), F.col('diaObject.hostgal_dec')] + >>> args += [F.col('diaSource.snr')] >>> args += [F.col('diaObject.hostgal_snsep')] >>> args += [F.col('diaObject.hostgal_zphot')] - >>> args += [F.col('diaObject.hostgal_zphot_err')] - >>> df = df.withColumn('pIa', rfscore_sigmoid_elasticc(*args)) + >>> df = df.withColumn('pIa', rfscore_rainbow_elasticc(*args)) >>> df.filter(df['pIa'] > 0.5).count() - 14 + 79 """ - + # dt is a column of floats dt = midPointTai.apply(lambda x: np.max(x) - np.min(x)) # Maximum days in the history @@ -393,42 +490,38 @@ def rfscore_sigmoid_elasticc( if len(midPointTai[mask]) == 0: return pd.Series(np.zeros(len(midPointTai), dtype=float)) - candid = pd.Series(range(len(midPointTai))) - ids = candid[mask] - # Load pre-trained model `clf` if model is not None: clf = load_scikit_model(model.values[0]) else: curdir = os.path.dirname(os.path.abspath(__file__)) - model = curdir + '/data/models/earlysnia_elasticc_03AGO2023_2filters.pkl' - clf = load_scikit_model(model) + model = curdir + '/data/models/elasticc_rainbow_earlyIa.pkl' + clf = pickle.load(open(model, 'rb')) + + candid = pd.Series(range(len(midPointTai))) + ids = candid[mask] test_features = [] - for j in ids: - pdf = pd.DataFrame.from_dict( - { - 'MJD': midPointTai[j], - 'FLT': filterName[j], - 'FLUXCAL': psFlux[j], - 'FLUXCALERR': psFluxErr[j] - } + for index in ids: + features = extract_features_rainbow( + midPointTai.values[index], + filterName.values[index], + cpsFlux.values[index], + cpsFluxErr.values[index], + band_wave_aa=band_wave_aa.values[0], + with_baseline=with_baseline.values[0], + min_data_points=min_data_points.values[0], + low_bound=low_bound.values[0] ) - features = get_sigmoid_features_elasticc_perfilter(pdf, list_filters=['u', 'g', 'r', 'i', 'z', 'Y']) - - # Julien added `id` meta_feats = [ - hostgal_dec.values[j], - hostgal_ra.values[j], - hostgal_snsep.values[j], - hostgal_zphot.values[j], - hostgal_zphot_err.values[j], - ra.values[j], - dec.values[j], + len(midPointTai.values[index]), + snr.values[index], + hostgal_snsep.values[index], + hostgal_zphot.values[index] ] - - test_features.append(np.concatenate((meta_feats, features))) + # test_features.append(meta_feats + list(features[1:])) + test_features.append(np.array(meta_feats + list(features))) # Make predictions probabilities = clf.predict_proba(test_features) @@ -449,7 +542,7 @@ def rfscore_sigmoid_elasticc( ztf_alert_sample = 'file://{}/data/alerts/datatest'.format(path) globs["ztf_alert_sample"] = ztf_alert_sample - elasticc_alert_sample = 'file://{}/data/alerts/elasticc_sample_seed0.parquet'.format(path) + elasticc_alert_sample = 'file://{}/data/alerts/test_elasticc_earlysnia.parquet'.format(path) globs["elasticc_alert_sample"] = elasticc_alert_sample model_path_sigmoid = '{}/data/models/default-model_sigmoid.obj'.format(path) diff --git a/fink_science/tester.py b/fink_science/tester.py index a32a91b9..718fa4d0 100644 --- a/fink_science/tester.py +++ b/fink_science/tester.py @@ -91,9 +91,10 @@ def spark_unit_tests(global_args: dict = None, verbose: bool = False): } ) elif spark.version.startswith('3'): + py4j_mod = 'org.slf4j:slf4j-log4j12:1.7.36,org.slf4j:slf4j-simple:1.7.36' confdic.update( { - "spark.jars.packages": 'org.apache.spark:spark-avro_2.12:{}'.format(spark.version) + "spark.jars.packages": 'org.apache.spark:spark-avro_2.12:{},{}'.format(spark.version, py4j_mod) } ) conf.setMaster("local[2]") diff --git a/requirements.txt b/requirements.txt index 06a9a987..cc250e28 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,12 +29,16 @@ seaborn scikit-image # Random Forest AL --e git+https://github.com/emilleishida/fink_sn_activelearning.git@bf8d4e263e02d42781642f872f7bc030c24792bc#egg=fink_sn_activelearning +-e git+https://github.com/emilleishida/fink_sn_activelearning.git@4f46b3a1e29de45793125452974e71e92c1ea454#egg=fink_sn_activelearning # KN -e git+https://github.com/b-biswas/kndetect@kndetect#egg=kndetect # SNAD -light-curve-python>=0.3.5 -light-curve>=0.4.0 +#light-curve-python>=0.3.5 +#light-curve>=0.4.0 + +#for ELAsTiCC +light-curve[full] +