Skip to content

Commit

Permalink
Issue/78/new updated filters (#79)
Browse files Browse the repository at this point in the history
* add a file containing functions applying filters depending of each observatory

* add new file for distribution

* update distribution according to the new filters

* install requirements update

* install requirements update

* force update of fink-filters

* update test data according to the new filters

* test

* save data

* restore test data from main

* add data for the distribution

---------

Co-authored-by: FusRoman <[email protected]>
  • Loading branch information
FusRoman and FusRoman authored Jul 12, 2023
1 parent b192a17 commit 2d9f7e6
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 66 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/run_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ jobs:
run: |
echo "PYTHONPATH="${SPARK_HOME}/python/test_coverage:$PYTHONPATH"" >> $GITHUB_ENV
echo "COVERAGE_PROCESS_START="${ROOTPATH}/.coveragerc"" >> $GITHUB_ENV
python -m pip install .
python -m pip install . --upgrade
python -m pip install -U fink_filters
- name: Run test suites
run: |
Expand Down Expand Up @@ -159,7 +160,8 @@ jobs:
run: |
echo "PYTHONPATH="${SPARK_HOME}/python/test_coverage:$PYTHONPATH"" >> $GITHUB_ENV
echo "COVERAGE_PROCESS_START="${ROOTPATH}/.coveragerc"" >> $GITHUB_ENV
python -m pip install .
python -m pip install . --upgrade
python -m pip install -U fink_filters
mkdir fink_grb/ci_gcn_test
mkdir fink_grb/ci_join_test
Expand Down
2 changes: 1 addition & 1 deletion fink_grb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.9"
__version__ = "0.10"
__distribution_schema_version__ = "1.2"
__observatory_schema_version__ = "1.1"
2 changes: 1 addition & 1 deletion fink_grb/conf/distribute_for_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ hdfs_gcn_storage=/user/roman.le-montagner/gcn_storage/raw
# They can be in local FS (/path/ or files:///path/) or
# in distributed FS (e.g. hdfs:///path/).
# Be careful though to have enough disk space!
online_ztf_data_prefix=fink_grb/test/test_data/ztf_test/online
online_ztf_data_prefix=fink_grb/test/test_data/distribution_test_data/

# Prefix path on disk to save GRB join ZTF data (work for both online and offline).
online_grb_data_prefix=fink_grb/test/test_data
Expand Down
2 changes: 1 addition & 1 deletion fink_grb/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def init_spark(doctest_namespace):

doctest_namespace["AlertConsumer"] = AlertConsumer
doctest_namespace["tabulate"] = tabulate
doctest_namespace["ztfxgcn_test"] = "fink_grb/test/test_data"
doctest_namespace["ztfxgcn_test"] = "fink_grb/test/test_data/distribution_test_data/"
doctest_namespace["headers"] = headers
doctest_namespace["maxtimeout"] = maxtimeout
doctest_namespace["myconfig"] = myconfig
Expand Down
90 changes: 90 additions & 0 deletions fink_grb/distribution/apply_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from pyspark.sql import functions as F

from fink_utils.broker.distributionUtils import write_to_kafka
from fink_filters.filter_mm_module.filter import (
f_grb_bronze_events,
f_grb_silver_events,
f_grb_gold_events,
f_gw_bronze_events,
)


def apply_filters(
df_stream,
schema,
tinterval,
checkpointpath_grb,
kafka_broker_server,
username,
password,
):
df_grb_bronze = (
df_stream.withColumn(
"f_bronze",
f_grb_bronze_events(
df_stream["fink_class"], df_stream["observatory"], df_stream["rb"]
),
)
.filter("f_bronze == True")
.drop("f_bronze")
)

df_grb_silver = (
df_stream.withColumn(
"f_silver",
f_grb_silver_events(
df_stream["fink_class"],
df_stream["observatory"],
df_stream["rb"],
df_stream["grb_proba"],
),
)
.filter("f_silver == True")
.drop("f_silver")
)

df_grb_gold = (
df_stream.withColumn(
"f_gold",
f_grb_gold_events(
df_stream["fink_class"],
df_stream["observatory"],
df_stream["rb"],
df_stream["grb_proba"],
df_stream["rate"],
),
)
.filter("f_gold == True")
.drop("f_gold")
)

df_gw_bronze = (
df_stream.withColumn(
"f_bronze",
f_gw_bronze_events(
df_stream["fink_class"], df_stream["observatory"], df_stream["rb"]
),
)
.filter("f_bronze == True")
.drop("f_bronze")
)

for df_filter, topicname in [
(df_grb_bronze, "fink_grb_bronze"),
(df_grb_silver, "fink_grb_silver"),
(df_grb_gold, "fink_grb_gold"),
(df_gw_bronze, "fink_gw_bronze"),
]:
checkpointpath_topic = checkpointpath_grb + "/{}_checkpoint".format(topicname)
grb_stream_distribute = write_to_kafka(
df_filter,
F.lit(schema),
kafka_broker_server,
username,
password,
topicname,
checkpointpath_topic,
tinterval,
)

return grb_stream_distribute
71 changes: 10 additions & 61 deletions fink_grb/distribution/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,8 @@
import json

from fink_utils.broker.sparkUtils import init_sparksession, connect_to_raw_database
from fink_utils.broker.distributionUtils import write_to_kafka
from importlib_resources import files

from fink_filters.filter_on_axis_grb.filter import (
f_bronze_events,
f_silver_events,
f_gold_events,
)

import fink_grb

from fink_grb.utils.fun_utils import (
Expand All @@ -23,8 +16,7 @@
import fink_grb.utils.application as apps
from fink_grb.init import get_config, init_logging, return_verbose_level
from fink_grb.utils.fun_utils import build_spark_submit

from pyspark.sql import functions as F
from fink_grb.distribution.apply_filters import apply_filters


def grb_distribution(
Expand Down Expand Up @@ -118,61 +110,18 @@ def grb_distribution(
cnames[
cnames.index("triggerTimeUTC")
] = "cast(triggerTimeUTC as string) as triggerTimeUTC"
df_grb_stream = df_grb_stream.selectExpr(cnames)

df_bronze = (
df_grb_stream.withColumn(
"f_bronze",
f_bronze_events(df_grb_stream["fink_class"], df_grb_stream["rb"]),
)
.filter("f_bronze == True")
.drop("f_bronze")
)

df_silver = (
df_grb_stream.withColumn(
"f_silver",
f_silver_events(
df_grb_stream["fink_class"],
df_grb_stream["rb"],
df_grb_stream["grb_proba"],
),
)
.filter("f_silver == True")
.drop("f_silver")
)

df_gold = (
df_grb_stream.withColumn(
"f_gold",
f_gold_events(
df_grb_stream["fink_class"],
df_grb_stream["rb"],
df_grb_stream["grb_proba"],
df_grb_stream["rate"],
),
)
.filter("f_gold == True")
.drop("f_gold")
grb_stream_distribute = apply_filters(
df_grb_stream,
schema,
tinterval,
checkpointpath_grb,
kafka_broker_server,
username,
password,
)

for df_filter, topicname in [
(df_bronze, "fink_grb_bronze"),
(df_silver, "fink_grb_silver"),
(df_gold, "fink_grb_gold"),
]:
df_filter = df_filter.selectExpr(cnames)
checkpointpath_topic = checkpointpath_grb + "/{}_checkpoint".format(topicname)
grb_stream_distribute = write_to_kafka(
df_filter,
F.lit(schema),
kafka_broker_server,
username,
password,
topicname,
checkpointpath_topic,
tinterval,
)

# Keep the Streaming running until something or someone ends it!
if exit_after is not None:
time.sleep(int(exit_after))
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 2d9f7e6

Please sign in to comment.