From edffb0643ebbaa02c465e3edad1db81edf58c3b5 Mon Sep 17 00:00:00 2001 From: Julien Date: Tue, 30 Jan 2024 13:08:44 +0100 Subject: [PATCH] Cache only required fields when computing statistics (#792) * Cache only required fields when computing statistics * Update conf file with dependencies * Use filter for SIMBAD statistics (#797) * Use filter for SIMBAD statistics * Use fink-utils definition of list of extragalactic sources. Closes #630 * Fix missing import * Cast numpy array into list for Spark operation --- .github/workflows/test.yml | 2 +- bin/daily_stats.py | 91 ++++++++++++++++++-------------------- conf/fink.conf.dev | 11 +++-- 3 files changed, 50 insertions(+), 54 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9a70dd25..e75d3a68 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -65,7 +65,7 @@ jobs: cd $USRLIBS source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION} cd $FINK_HOME - fink_test -c conf/fink.conf.dev --unit-tests + fink_test -c conf/fink.conf.dev --unit-tests --db-integration - name: Run test suites [prod] if: matrix.container == 'julienpeloton/fink-ci:prod' run: | diff --git a/bin/daily_stats.py b/bin/daily_stats.py index d2fe8927..fe448fa0 100644 --- a/bin/daily_stats.py +++ b/bin/daily_stats.py @@ -20,12 +20,17 @@ import numpy as np import pandas as pd +import pyspark.sql.functions as F + from fink_broker.sparkUtils import init_sparksession from fink_broker.hbaseUtils import push_to_hbase from fink_broker.parser import getargs from fink_broker.loggingUtils import get_fink_logger, inspect_application from fink_filters.classification import extract_fink_classification +from fink_filters.filter_simbad_candidates.filter import simbad_candidates + +from fink_utils.xmatch.simbad import return_list_of_eg_host def main(): @@ -58,50 +63,7 @@ def main(): df_raw = spark.read.format('parquet').load(input_raw) df_sci = spark.read.format('parquet').load(input_science) - df_sci = df_sci.cache() - - # Number of alerts - n_raw_alert = df_raw.count() - n_sci_alert = df_sci.count() - - out_dic = {} - out_dic['raw'] = n_raw_alert - out_dic['sci'] = n_sci_alert - - # matches with SIMBAD - n_simbad = df_sci.select('cdsxmatch')\ - .filter(df_sci['cdsxmatch'] != 'Unknown')\ - .count() - - out_dic['simbad_tot'] = n_simbad - - # Alerts with a close-by candidate host-galaxy - list_simbad_galaxies = [ - "galaxy", - "Galaxy", - "EmG", - "Seyfert", - "Seyfert_1", - "Seyfert_2", - "BlueCompG", - "StarburstG", - "LSB_G", - "HII_G", - "High_z_G", - "GinPair", - "GinGroup", - "BClG", - "GinCl", - "PartofG", - ] - - n_simbad_gal = df_sci.select('cdsxmatch')\ - .filter(df_sci['cdsxmatch'].isin(list_simbad_galaxies))\ - .count() - - out_dic['simbad_gal'] = n_simbad_gal - - df_class = df_sci.withColumn( + df_sci = df_sci.withColumn( 'class', extract_fink_classification( df_sci['cdsxmatch'], @@ -120,26 +82,57 @@ def main(): ) ) - out_class = df_class.groupBy('class').count().collect() + cols = [ + 'cdsxmatch', + 'candidate.field', + 'candidate.fid', + 'candidate.jd', + 'class' + ] + df_sci = df_sci.select(cols).cache() + + # Number of alerts + n_raw_alert = df_raw.count() + n_sci_alert = df_sci.count() + + out_dic = {} + out_dic['raw'] = n_raw_alert + out_dic['sci'] = n_sci_alert + + # matches with SIMBAD + n_simbad = df_sci.withColumn( + "is_simbad", + simbad_candidates("cdsxmatch") + ).filter(F.col("is_simbad")).count() + + out_dic['simbad_tot'] = n_simbad + + n_simbad_gal = df_sci.select('cdsxmatch')\ + .filter(df_sci['cdsxmatch'].isin(list(return_list_of_eg_host())))\ + .count() + + out_dic['simbad_gal'] = n_simbad_gal + + out_class = df_sci.groupBy('class').count().collect() out_class_ = [o.asDict() for o in out_class] out_class_ = [list(o.values()) for o in out_class_] for kv in out_class_: out_dic[kv[0]] = kv[1] # Number of fields - n_field = df_sci.select('candidate.field').distinct().count() + n_field = df_sci.select('field').distinct().count() out_dic['fields'] = n_field # number of measurements per band - n_g = df_sci.select('candidate.fid').filter('fid == 1').count() - n_r = df_sci.select('candidate.fid').filter('fid == 2').count() + n_g = df_sci.select('fid').filter('fid == 1').count() + n_r = df_sci.select('fid').filter('fid == 2').count() out_dic['n_g'] = n_g out_dic['n_r'] = n_r # Number of exposures - n_exp = df_sci.select('candidate.jd').distinct().count() + n_exp = df_sci.select('jd').distinct().count() out_dic['exposures'] = n_exp diff --git a/conf/fink.conf.dev b/conf/fink.conf.dev index 96d72957..076aaa4c 100644 --- a/conf/fink.conf.dev +++ b/conf/fink.conf.dev @@ -1,4 +1,4 @@ -# Copyright 2018-2022 AstroLab Software +# Copyright 2018-2024 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -46,11 +46,14 @@ FINK_PACKAGES=\ org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\ org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\ org.apache.spark:spark-avro_2.12:3.4.1,\ -org.apache.hbase:hbase-shaded-mapreduce:2.4.9 +org.apache.hbase:hbase-shaded-mapreduce:2.2.7,\ +org.slf4j:slf4j-log4j12:1.7.36,\ +org.slf4j:slf4j-simple:1.7.36 # Other dependencies (incl. Scala part of Fink) -FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ -${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar +FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\ +${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ +${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar # Time interval between 2 trigger updates (second) # i.e. the timing of streaming data processing.