Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache only required fields when computing statistics #792

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.dev --unit-tests
fink_test -c conf/fink.conf.dev --unit-tests --db-integration
- name: Run test suites [prod]
if: matrix.container == 'julienpeloton/fink-ci:prod'
run: |
Expand Down
91 changes: 42 additions & 49 deletions bin/daily_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import numpy as np
import pandas as pd

import pyspark.sql.functions as F

from fink_broker.sparkUtils import init_sparksession
from fink_broker.hbaseUtils import push_to_hbase
from fink_broker.parser import getargs
from fink_broker.loggingUtils import get_fink_logger, inspect_application

from fink_filters.classification import extract_fink_classification
from fink_filters.filter_simbad_candidates.filter import simbad_candidates

from fink_utils.xmatch.simbad import return_list_of_eg_host


def main():
Expand Down Expand Up @@ -58,50 +63,7 @@ def main():
df_raw = spark.read.format('parquet').load(input_raw)
df_sci = spark.read.format('parquet').load(input_science)

df_sci = df_sci.cache()

# Number of alerts
n_raw_alert = df_raw.count()
n_sci_alert = df_sci.count()

out_dic = {}
out_dic['raw'] = n_raw_alert
out_dic['sci'] = n_sci_alert

# matches with SIMBAD
n_simbad = df_sci.select('cdsxmatch')\
.filter(df_sci['cdsxmatch'] != 'Unknown')\
.count()

out_dic['simbad_tot'] = n_simbad

# Alerts with a close-by candidate host-galaxy
list_simbad_galaxies = [
"galaxy",
"Galaxy",
"EmG",
"Seyfert",
"Seyfert_1",
"Seyfert_2",
"BlueCompG",
"StarburstG",
"LSB_G",
"HII_G",
"High_z_G",
"GinPair",
"GinGroup",
"BClG",
"GinCl",
"PartofG",
]

n_simbad_gal = df_sci.select('cdsxmatch')\
.filter(df_sci['cdsxmatch'].isin(list_simbad_galaxies))\
.count()

out_dic['simbad_gal'] = n_simbad_gal

df_class = df_sci.withColumn(
df_sci = df_sci.withColumn(
'class',
extract_fink_classification(
df_sci['cdsxmatch'],
Expand All @@ -120,26 +82,57 @@ def main():
)
)

out_class = df_class.groupBy('class').count().collect()
cols = [
'cdsxmatch',
'candidate.field',
'candidate.fid',
'candidate.jd',
'class'
]
df_sci = df_sci.select(cols).cache()

# Number of alerts
n_raw_alert = df_raw.count()
n_sci_alert = df_sci.count()

out_dic = {}
out_dic['raw'] = n_raw_alert
out_dic['sci'] = n_sci_alert

# matches with SIMBAD
n_simbad = df_sci.withColumn(
"is_simbad",
simbad_candidates("cdsxmatch")
).filter(F.col("is_simbad")).count()

out_dic['simbad_tot'] = n_simbad

n_simbad_gal = df_sci.select('cdsxmatch')\
.filter(df_sci['cdsxmatch'].isin(list(return_list_of_eg_host())))\
.count()

out_dic['simbad_gal'] = n_simbad_gal

out_class = df_sci.groupBy('class').count().collect()
out_class_ = [o.asDict() for o in out_class]
out_class_ = [list(o.values()) for o in out_class_]
for kv in out_class_:
out_dic[kv[0]] = kv[1]

# Number of fields
n_field = df_sci.select('candidate.field').distinct().count()
n_field = df_sci.select('field').distinct().count()

out_dic['fields'] = n_field

# number of measurements per band
n_g = df_sci.select('candidate.fid').filter('fid == 1').count()
n_r = df_sci.select('candidate.fid').filter('fid == 2').count()
n_g = df_sci.select('fid').filter('fid == 1').count()
n_r = df_sci.select('fid').filter('fid == 2').count()

out_dic['n_g'] = n_g
out_dic['n_r'] = n_r

# Number of exposures
n_exp = df_sci.select('candidate.jd').distinct().count()
n_exp = df_sci.select('jd').distinct().count()

out_dic['exposures'] = n_exp

Expand Down
11 changes: 7 additions & 4 deletions conf/fink.conf.dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018-2022 AstroLab Software
# Copyright 2018-2024 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -46,11 +46,14 @@ FINK_PACKAGES=\
org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\
org.apache.spark:spark-avro_2.12:3.4.1,\
org.apache.hbase:hbase-shaded-mapreduce:2.4.9
org.apache.hbase:hbase-shaded-mapreduce:2.2.7,\
org.slf4j:slf4j-log4j12:1.7.36,\
org.slf4j:slf4j-simple:1.7.36

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar
FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\
${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar

# Time interval between 2 trigger updates (second)
# i.e. the timing of streaming data processing.
Expand Down
Loading