diff --git a/.github/workflows/run_test.yml b/.github/workflows/run_test.yml index 84dac8d..4e91f90 100644 --- a/.github/workflows/run_test.yml +++ b/.github/workflows/run_test.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: - container: ["julienpeloton/fink-ci:prod", "julienpeloton/fink-ci:dev"] + container: ["julienpeloton/fink-ci:dev"] container: image: ${{ matrix.container }} diff --git a/datatest_grb/Readme.md b/datatest/grb/Readme.md similarity index 100% rename from datatest_grb/Readme.md rename to datatest/grb/Readme.md diff --git a/datatest_grb/grb_test_data.parquet b/datatest/grb/grb_test_data.parquet similarity index 100% rename from datatest_grb/grb_test_data.parquet rename to datatest/grb/grb_test_data.parquet diff --git a/datatest/.part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc b/datatest/regular/.part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc similarity index 100% rename from datatest/.part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc rename to datatest/regular/.part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc diff --git a/datatest/.part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc b/datatest/regular/.part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc similarity index 100% rename from datatest/.part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc rename to datatest/regular/.part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc diff --git a/datatest/.part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc b/datatest/regular/.part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc similarity index 100% rename from datatest/.part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc rename to datatest/regular/.part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc diff --git a/datatest/.part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc b/datatest/regular/.part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc similarity index 100% rename from datatest/.part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc rename to datatest/regular/.part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet.crc diff --git a/datatest/part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet b/datatest/regular/part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet similarity index 100% rename from datatest/part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet rename to datatest/regular/part-00000-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet diff --git a/datatest/part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet b/datatest/regular/part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet similarity index 100% rename from datatest/part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet rename to datatest/regular/part-00001-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet diff --git a/datatest/part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet b/datatest/regular/part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet similarity index 100% rename from datatest/part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet rename to datatest/regular/part-00002-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet diff --git a/datatest/part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet b/datatest/regular/part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet similarity index 100% rename from datatest/part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet rename to datatest/regular/part-00003-8ff58323-1f3e-498f-924b-f88dd471e8d5-c000.snappy.parquet diff --git a/datatest/spicy_yso/part-00000-c5ad6d0c-d376-4497-b448-6a2b04ba64f3-c000.snappy.parquet b/datatest/spicy_yso/part-00000-c5ad6d0c-d376-4497-b448-6a2b04ba64f3-c000.snappy.parquet new file mode 100644 index 0000000..073b719 Binary files /dev/null and b/datatest/spicy_yso/part-00000-c5ad6d0c-d376-4497-b448-6a2b04ba64f3-c000.snappy.parquet differ diff --git a/datatest_tde/tde_ZTF20abfcszi_202008.parquet b/datatest/tde/tde_ZTF20abfcszi_202008.parquet similarity index 100% rename from datatest_tde/tde_ZTF20abfcszi_202008.parquet rename to datatest/tde/tde_ZTF20abfcszi_202008.parquet diff --git a/fink_filters/classification.py b/fink_filters/classification.py index 019cc73..627019c 100644 --- a/fink_filters/classification.py +++ b/fink_filters/classification.py @@ -84,7 +84,7 @@ def extract_fink_classification_( Examples --------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = extract_fink_classification_( ... pdf['cdsxmatch'], ... pdf['roid'], @@ -184,7 +184,7 @@ def extract_fink_classification_from_pdf(pdf): out: pandas.Series of string Return a Pandas series with the classification tag - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = extract_fink_classification_from_pdf(pdf) >>> pdf['class'] = classification >>> pdf.groupby('class').count().sort_values('objectId', ascending=False)['objectId'].head(10) diff --git a/fink_filters/filter_anomaly_notification/filter.py b/fink_filters/filter_anomaly_notification/filter.py index 5d7c070..bdde399 100755 --- a/fink_filters/filter_anomaly_notification/filter.py +++ b/fink_filters/filter_anomaly_notification/filter.py @@ -81,7 +81,7 @@ def anomaly_notification_( >>> from fink_science.ad_features.processor import extract_features_ad >>> from fink_science.anomaly_detection.processor import anomaly_score - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> what = [ ... 'jd', 'fid', 'magpsf', 'sigmapsf', @@ -114,9 +114,8 @@ def anomaly_notification_( >>> pdf_anomalies = anomaly_notification_(df_proc, threshold=10, ... send_to_tg=False, channel_id=None, ... send_to_slack=False, channel_name=None) - >>> print(pdf_anomalies['objectId'].values) - ['ZTF21acoshvy' 'ZTF18abgjtxx' 'ZTF19acevxhv' 'ZTF19aboujyi' 'ZTF18aapgymv' - 'ZTF18abbtxsx' 'ZTF18aaakhsv' 'ZTF18aaypnnd' 'ZTF18aapoack' 'ZTF18abzvnya'] + >>> print(sorted(pdf_anomalies['objectId'].values)) + ['ZTF18aaakhsv', 'ZTF18aabeyfi', 'ZTF18aapgymv', 'ZTF18aapoack', 'ZTF18abbtxsx', 'ZTF18abgjtxx', 'ZTF18abzvnya', 'ZTF19aboujyi', 'ZTF19acevxhv', 'ZTF21acoshvy'] # Check cut_coords >>> pdf_anomalies = anomaly_notification_(df_proc, threshold=10, diff --git a/fink_filters/filter_blazar/filter.py b/fink_filters/filter_blazar/filter.py index 424983a..d51743d 100644 --- a/fink_filters/filter_blazar/filter.py +++ b/fink_filters/filter_blazar/filter.py @@ -39,7 +39,7 @@ def blazar(cdsxmatch: Any) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_blazar.filter.blazar' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_early_kn_candidates/filter.py b/fink_filters/filter_early_kn_candidates/filter.py index 5cac406..77b26a6 100644 --- a/fink_filters/filter_early_kn_candidates/filter.py +++ b/fink_filters/filter_early_kn_candidates/filter.py @@ -189,7 +189,7 @@ def early_kn_candidates_( Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = early_kn_candidates_( ... pdf['candidate'].apply(lambda x: x['drb']), ... pdf['candidate'].apply(lambda x: x['classtar']), @@ -267,7 +267,7 @@ def early_kn_candidates( Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_early_kn_candidates.filter.early_kn_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_early_sn_candidates/filter.py b/fink_filters/filter_early_sn_candidates/filter.py index ace7539..6663517 100644 --- a/fink_filters/filter_early_sn_candidates/filter.py +++ b/fink_filters/filter_early_sn_candidates/filter.py @@ -51,7 +51,7 @@ def early_sn_candidates_( Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = early_sn_candidates_( ... pdf['cdsxmatch'], ... pdf['snn_snia_vs_nonia'], @@ -112,7 +112,7 @@ def early_sn_candidates( Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_early_sn_candidates.filter.early_sn_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_kn_candidates/filter.py b/fink_filters/filter_kn_candidates/filter.py index 49cfb47..4b135c7 100644 --- a/fink_filters/filter_kn_candidates/filter.py +++ b/fink_filters/filter_kn_candidates/filter.py @@ -65,7 +65,7 @@ def kn_candidates_( Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = kn_candidates_( ... pdf['rf_kn_vs_nonkn'], ... pdf['rf_snia_vs_nonia'], @@ -143,7 +143,7 @@ def kn_candidates( ---------- >>> from fink_utils.spark.utils import concat_col >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> to_expand = [ ... 'jd', 'fid', 'magpsf', 'sigmapsf', diff --git a/fink_filters/filter_known_tde/filter.py b/fink_filters/filter_known_tde/filter.py index 68ca6c0..fbf24d9 100644 --- a/fink_filters/filter_known_tde/filter.py +++ b/fink_filters/filter_known_tde/filter.py @@ -49,7 +49,7 @@ def known_tde_(objectId, ra, dec, radius_arcsec=pd.Series([5])) -> pd.Series: Examples ---------- - >>> pdf = pd.read_parquet('datatest_tde') + >>> pdf = pd.read_parquet('datatest/tde') >>> classification = known_tde_( ... pdf['objectId'], ... pdf['candidate'].apply(lambda x: x['ra']), @@ -126,7 +126,7 @@ def known_tde(objectId, ra, dec) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest_tde') + >>> df = spark.read.format('parquet').load('datatest/tde') >>> f = 'fink_filters.filter_known_tde.filter.known_tde' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_microlensing_candidates/filter.py b/fink_filters/filter_microlensing_candidates/filter.py index 14a6a74..146819e 100644 --- a/fink_filters/filter_microlensing_candidates/filter.py +++ b/fink_filters/filter_microlensing_candidates/filter.py @@ -37,7 +37,7 @@ def microlensing_candidates_(mulens) -> pd.Series: Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = microlensing_candidates_(pdf['mulens']) >>> print(pdf[classification]['objectId'].values) [] @@ -67,7 +67,7 @@ def microlensing_candidates(mulens) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_microlensing_candidates.filter.microlensing_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_mm_module/filter.py b/fink_filters/filter_mm_module/filter.py index ae078ca..f2c8ff8 100644 --- a/fink_filters/filter_mm_module/filter.py +++ b/fink_filters/filter_mm_module/filter.py @@ -269,5 +269,5 @@ def f_gw_bronze_events(fink_class, observatory, rb): # Run the test suite globs = globals() - globs["grb_output_data"] = "datatest_grb/grb_test_data.parquet" + globs["grb_output_data"] = "datatest/grb/grb_test_data.parquet" spark_unit_tests(globs) diff --git a/fink_filters/filter_orphan_grb_candidates/filter.py b/fink_filters/filter_orphan_grb_candidates/filter.py index f8114a1..28e3802 100644 --- a/fink_filters/filter_orphan_grb_candidates/filter.py +++ b/fink_filters/filter_orphan_grb_candidates/filter.py @@ -84,7 +84,7 @@ def orphan_grb(jd, jdstarthist, cjdc, cfidc, cssnamenrc, cmagpsfc): ---------- >>> from fink_utils.spark.utils import concat_col >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> to_expand = ['jd', 'fid', 'ssnamenr', 'magpsf'] diff --git a/fink_filters/filter_rate_based_kn_candidates/filter.py b/fink_filters/filter_rate_based_kn_candidates/filter.py index 6995a40..1385118 100644 --- a/fink_filters/filter_rate_based_kn_candidates/filter.py +++ b/fink_filters/filter_rate_based_kn_candidates/filter.py @@ -73,6 +73,7 @@ def perform_classification( cjdc, cfidc, cmagpsfc, csigmapsfc, cmagnrc, csigmagnrc, cmagzpscic: Spark DataFrame Columns Columns containing history of fid, magpsf, sigmapsf, magnr, sigmagnr, magzpsci, isdiffpos as arrays + Returns ---------- out: pandas.Series of bool @@ -215,6 +216,7 @@ def rate_based_kn_candidates_( cjdc, cfidc, cmagpsfc, csigmapsfc, cmagnrc, csigmagnrc, cmagzpscic: Spark DataFrame Columns Columns containing history of fid, magpsf, sigmapsf, magnr, sigmagnr, magzpsci, isdiffpos as arrays + Returns ---------- out: pandas.Series of bool @@ -270,6 +272,7 @@ def rate_based_kn_candidates( cjdc, cfidc, cmagpsfc, csigmapsfc, cmagnrc, csigmagnrc, cmagzpscic: Spark DataFrame Columns Columns containing history of fid, magpsf, sigmapsf, magnr, sigmagnr, magzpsci, isdiffpos as arrays + Returns ---------- out: pandas.Series of bool @@ -280,7 +283,7 @@ def rate_based_kn_candidates( ---------- >>> from fink_utils.spark.utils import concat_col >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> to_expand = ['jd', 'fid', 'magpsf', 'sigmapsf', 'magnr', 'sigmagnr', 'magzpsci', 'isdiffpos'] diff --git a/fink_filters/filter_rrlyr/filter.py b/fink_filters/filter_rrlyr/filter.py index b70deb6..b4b5290 100644 --- a/fink_filters/filter_rrlyr/filter.py +++ b/fink_filters/filter_rrlyr/filter.py @@ -39,7 +39,7 @@ def rrlyr(cdsxmatch: Any) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_rrlyr.filter.rrlyr' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_simbad_candidates/filter.py b/fink_filters/filter_simbad_candidates/filter.py index eb7c169..5fa1acc 100644 --- a/fink_filters/filter_simbad_candidates/filter.py +++ b/fink_filters/filter_simbad_candidates/filter.py @@ -35,7 +35,7 @@ def simbad_candidates_(cdsxmatch) -> pd.Series: Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = simbad_candidates_(pdf['cdsxmatch']) >>> nalerts = len(pdf[classification]['objectId']) >>> print(nalerts) @@ -80,7 +80,7 @@ def simbad_candidates(cdsxmatch) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_simbad_candidates.filter.simbad_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_simbad_grav_candidates/filter.py b/fink_filters/filter_simbad_grav_candidates/filter.py index b846861..f09ca3a 100644 --- a/fink_filters/filter_simbad_grav_candidates/filter.py +++ b/fink_filters/filter_simbad_grav_candidates/filter.py @@ -35,7 +35,7 @@ def simbad_grav_candidates_(cdsxmatch) -> pd.Series: Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = simbad_grav_candidates_(pdf['cdsxmatch']) >>> nalerts = len(pdf[classification]['objectId']) >>> print(nalerts) @@ -80,7 +80,7 @@ def simbad_grav_candidates(cdsxmatch) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_simbad_grav_candidates.filter.simbad_grav_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_sn_candidates/filter.py b/fink_filters/filter_sn_candidates/filter.py index 6f68a08..0ad032a 100644 --- a/fink_filters/filter_sn_candidates/filter.py +++ b/fink_filters/filter_sn_candidates/filter.py @@ -56,7 +56,7 @@ def sn_candidates_( Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = sn_candidates_( ... pdf['cdsxmatch'], ... pdf['snn_snia_vs_nonia'], @@ -95,7 +95,7 @@ def sn_candidates( Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_sn_candidates.filter.sn_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_sso_fink_candidates/filter.py b/fink_filters/filter_sso_fink_candidates/filter.py index 66e7ad5..3b9b2de 100644 --- a/fink_filters/filter_sso_fink_candidates/filter.py +++ b/fink_filters/filter_sso_fink_candidates/filter.py @@ -35,7 +35,7 @@ def sso_fink_candidates_(roid) -> pd.Series: Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = sso_fink_candidates_(pdf['roid']) >>> print(len(pdf[classification]['objectId'].values)) 3 @@ -64,7 +64,7 @@ def sso_fink_candidates(roid) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_sso_fink_candidates.filter.sso_fink_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_sso_ztf_candidates/filter.py b/fink_filters/filter_sso_ztf_candidates/filter.py index 86463e4..ffbd519 100644 --- a/fink_filters/filter_sso_ztf_candidates/filter.py +++ b/fink_filters/filter_sso_ztf_candidates/filter.py @@ -35,7 +35,7 @@ def sso_ztf_candidates_(roid) -> pd.Series: Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = sso_ztf_candidates_(pdf['roid']) >>> print(len(pdf[classification]['objectId'].values)) 3 @@ -64,7 +64,7 @@ def sso_ztf_candidates(roid) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_sso_ztf_candidates.filter.sso_ztf_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_tracklet_candidates/filter.py b/fink_filters/filter_tracklet_candidates/filter.py index 3614ff5..d6a67b9 100644 --- a/fink_filters/filter_tracklet_candidates/filter.py +++ b/fink_filters/filter_tracklet_candidates/filter.py @@ -35,7 +35,7 @@ def tracklet_candidates_(tracklet) -> pd.Series: Examples ---------- - >>> pdf = pd.read_parquet('datatest') + >>> pdf = pd.read_parquet('datatest/regular') >>> classification = tracklet_candidates_(pdf['tracklet']) >>> print(len(pdf[classification]['objectId'].values)) 2 @@ -64,7 +64,7 @@ def tracklet_candidates(tracklet) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_tracklet_candidates.filter.tracklet_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/fink_filters/filter_yso_candidates/filter.py b/fink_filters/filter_yso_candidates/filter.py index 12b9959..469fd82 100644 --- a/fink_filters/filter_yso_candidates/filter.py +++ b/fink_filters/filter_yso_candidates/filter.py @@ -40,7 +40,7 @@ def yso_candidates(cdsxmatch: Any) -> pd.Series: Examples ---------- >>> from fink_utils.spark.utils import apply_user_defined_filter - >>> df = spark.read.format('parquet').load('datatest') + >>> df = spark.read.format('parquet').load('datatest/regular') >>> f = 'fink_filters.filter_yso_candidates.filter.yso_candidates' >>> df = apply_user_defined_filter(df, f) >>> print(df.count()) diff --git a/datatest/_SUCCESS b/fink_filters/filter_yso_spicy_candidates/__init__.py similarity index 100% rename from datatest/_SUCCESS rename to fink_filters/filter_yso_spicy_candidates/__init__.py diff --git a/fink_filters/filter_yso_spicy_candidates/filter.py b/fink_filters/filter_yso_spicy_candidates/filter.py new file mode 100644 index 0000000..fbcca19 --- /dev/null +++ b/fink_filters/filter_yso_spicy_candidates/filter.py @@ -0,0 +1,113 @@ +# Copyright 2024 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.types import BooleanType + +from fink_utils.tg_bot.utils import get_curve, msg_handler_tg + +from fink_filters.tester import spark_unit_tests + +import pandas as pd +import os + + +@pandas_udf(BooleanType(), PandasUDFType.SCALAR) +def yso_spicy_candidates( + spicy_id, spicy_class, objectId, cjdc, cmagpsfc, csigmapsfc, cdiffmaglimc, cfidc +) -> pd.Series: + """Return alerts with a match in the SPICY catalog + + Parameters + ---------- + spicy_id: Spark DataFrame Column + Column containing the ID of the SPICY catalog + -1 if no match, otherwise > 0 + + Returns + ---------- + out: pandas.Series of bool + Return a Pandas DataFrame with the appropriate flag: + false for bad alert, and true for good alert. + + Examples + ---------- + >>> from fink_utils.spark.utils import apply_user_defined_filter + >>> from fink_utils.spark.utils import concat_col + >>> df = spark.read.format('parquet').load('datatest/spicy_yso') + + >>> to_expand = ['jd', 'fid', 'magpsf', 'sigmapsf', 'diffmaglim'] + + >>> prefix = 'c' + >>> for colname in to_expand: + ... df = concat_col(df, colname, prefix=prefix) + + # quick fix for https://github.com/astrolabsoftware/fink-broker/issues/457 + >>> for colname in to_expand: + ... df = df.withColumnRenamed('c' + colname, 'c' + colname + 'c') + + >>> f = 'fink_filters.filter_yso_spicy_candidates.filter.yso_spicy_candidates' + >>> df = apply_user_defined_filter(df, f) + >>> print(df.count()) + 10 + """ + mask = spicy_id.values != -1 + + pdf = pd.DataFrame( + { + "objectId": objectId, + "magpsf": cmagpsfc, + "sigmapsf": csigmapsfc, + "diffmaglim": cdiffmaglimc, + "fid": cfidc, + "jd": cjdc, + "spicy_id": spicy_id, + "spicy_class": spicy_class, + } + ) + + # Loop over matches + if "FINK_TG_TOKEN" in os.environ: + payloads = [] + for _, alert in pdf[mask].iterrows(): + curve_png = get_curve( + jd=alert["jd"], + magpsf=alert["magpsf"], + sigmapsf=alert["sigmapsf"], + diffmaglim=alert["diffmaglim"], + fid=alert["fid"], + objectId=alert["objectId"], + origin="fields", + ) + # pd.DataFrame({'magpsf': alert["magpsf"]})['magpsf'] + hyperlink = "[{}](https://fink-portal.org/{}): ID {} ({})".format( + alert["objectId"], + alert["objectId"], + alert["spicy_id"], + alert["spicy_class"], + ) + payloads.append((hyperlink, None, curve_png)) + + if len(payloads) > 0: + msg_handler_tg(payloads, channel_id="@spicy_fink", init_msg="") + + return pd.Series(mask) + + +if __name__ == "__main__": + """Execute the test suite""" + + # Run the test suite + globs = globals() + spark_unit_tests(globs)