Skip to content

Commit

Permalink
Merge branch '877-automated-e2e-tests'
Browse files Browse the repository at this point in the history
  • Loading branch information
fjammes committed Sep 16, 2024
2 parents 83ed279 + 00d938f commit c681f64
Show file tree
Hide file tree
Showing 19 changed files with 502 additions and 335 deletions.
2 changes: 1 addition & 1 deletion .ciux
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies:
- image: gitlab-registry.in2p3.fr/astrolabsoftware/fink/spark-py:k8s-3.4.1
labels:
build: "true"
- package: github.com/k8s-school/[email protected]rc7
- package: github.com/k8s-school/[email protected]rc9
labels:
itest: "optional"
- package: github.com/astrolabsoftware/finkctl/[email protected]
Expand Down
21 changes: 9 additions & 12 deletions .github/workflows/e2e-common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
touch artifacts/empty
fi
echo "IMAGE=$CIUX_IMAGE_URL" >> "$GITHUB_OUTPUT"
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
with:
name: docker-artifact
path: artifacts
Expand All @@ -89,10 +89,6 @@ jobs:
uses: actions/checkout@v3
with:
fetch-depth: 0
# - name: Setup tmate session
# uses: mxschmitt/action-tmate@v3
# with:
# detached: true
- name: Maximize build space
run: |
echo "Removing unwanted software... "
Expand Down Expand Up @@ -123,15 +119,15 @@ jobs:
# v0.20.0 does not work on self-hosted runners
./e2e/prereq-install.sh -k "${{ inputs.kind_version }}"
- name: Download image
uses: actions/download-artifact@v4.1.7
uses: actions/download-artifact@v4
with:
name: docker-artifact
path: artifacts
- name: Load container image inside kind
run: |
if [ -f artifacts/image.tar ]; then
echo "Loading image from archive"
cluster_name=$(ciux get cluster-name $PWD)
cluster_name=$(ciux get clustername $PWD)
kind load image-archive artifacts/image.tar --name "$cluster_name"
node=$(kubectl get nodes --selector=node-role.kubernetes.io/control-plane -o jsonpath='{.items[0].metadata.name}')
docker exec -- $node crictl image
Expand All @@ -141,9 +137,10 @@ jobs:
- name: Run argoCD
run: |
./e2e/argocd.sh
- name: Wait fink-broker to be up and running
run: |
./e2e/wait-fink.sh
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
with:
detached: true
- name: Check results
run: |
./e2e/check-results.sh
Expand All @@ -163,7 +160,7 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- name: Download image
uses: actions/download-artifact@v4.1.7
uses: actions/download-artifact@v4
with:
name: docker-artifact
path: artifacts
Expand Down Expand Up @@ -198,7 +195,7 @@ jobs:
needs: [build, integration-tests]
steps:
- name: Download image
uses: actions/download-artifact@v4.1.7
uses: actions/download-artifact@v4
with:
name: docker-artifact
path: artifacts
Expand Down
File renamed without changes.
103 changes: 28 additions & 75 deletions bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,16 @@
import argparse
import logging
import time
import os

from fink_utils.spark import schema_converter
from fink_broker.parser import getargs
from fink_broker.spark_utils import (
init_sparksession,
connect_to_raw_database,
path_exist,
)
from fink_broker.distribution_utils import get_kafka_df

from fink_broker.logging_utils import init_logger
from fink_utils.spark.utils import concat_col

from fink_utils.spark.utils import apply_user_defined_filter


Expand All @@ -60,69 +57,13 @@
]


def launch_fink_mm(spark, args: dict):
"""Manage multimessenger operations
Parameters
----------
spark: SparkSession
Spark Session
args: dict
Arguments from Fink configuration file
Returns
-------
time_spent_in_wait: int
Time spent in waiting for GCN to come
before launching the streaming query.
stream_distrib_list: list of StreamingQuery
List of Spark Streaming queries
"""
if args.noscience:
_LOG.info("No science: fink-mm is not applied")
return 0, []
elif args.mmconfigpath != "no-config":
from fink_mm.init import get_config
from fink_broker.mm_utils import mm2distribute

_LOG.info("Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})

# Wait for GCN comming
time_spent_in_wait = 0
stream_distrib_list = []
while time_spent_in_wait < args.exit_after:
mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")

# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
t_before = time.time()
_LOG.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(spark, config, args)
time_spent_in_wait += time.time() - t_before
break

time_spent_in_wait += 1
time.sleep(1.0)
if stream_distrib_list == []:
_LOG.warning(
f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job."
)
else:
_LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds")
return time_spent_in_wait, stream_distrib_list

_LOG.warning("No configuration found for fink-mm -- not applied")
return 0, []


def main():
parser = argparse.ArgumentParser(description=__doc__)
args = getargs(parser)

# Initialise Spark session
logger = init_logger(args.log_level)

logger.debug("Initialise Spark session")
spark = init_sparksession(
name="distribute_{}_{}".format(args.producer, args.night),
shuffle_partitions=2,
Expand All @@ -135,10 +76,10 @@ def main():
args.night
)

# Connect to the TMP science database
logger.debug("Connect to the TMP science database")
df = connect_to_raw_database(scitmpdatapath, scitmpdatapath, latestfirst=False)

# Cast fields to ease the distribution
logger.debug("Cast fields to ease the distribution")
cnames = df.columns

if "brokerEndProcessTimestamp" in cnames:
Expand Down Expand Up @@ -173,7 +114,7 @@ def main():
"struct(lc_features_r.*) as lc_features_r"
)

# Retrieve time-series information
logger.debug("Retrieve time-series information")
to_expand = [
"jd",
"fid",
Expand All @@ -186,7 +127,7 @@ def main():
"diffmaglim",
]

# Append temp columns with historical + current measurements
logger.debug("Append temp columns with historical + current measurements")
prefix = "c"
for colname in to_expand:
df = concat_col(df, colname, prefix=prefix)
Expand All @@ -206,23 +147,28 @@ def main():
# The topic name is the filter name
topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf"

# Apply user-defined filter
if args.noscience:
logger.debug(
"Do not apply user-defined filter %s in no-science mode", userfilter
)
df_tmp = df
else:
logger.debug("Apply user-defined filter %s", userfilter)
df_tmp = apply_user_defined_filter(df, userfilter, _LOG)

# Wrap alert data
logger.debug("Wrap alert data")
df_tmp = df_tmp.selectExpr(cnames)

# get schema from the streaming dataframe to
# avoid non-nullable bug #852
schema = schema_converter.to_avro(df_tmp.schema)

# Get the DataFrame for publishing to Kafka (avro serialized)
logger.debug(
"Get the DataFrame for publishing to Kafka (avro serialized): %s", df_tmp
)
df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False)

# Ensure that the topic(s) exist on the Kafka Server)
logger.debug("Ensure that the topic '%s' exist on the Kafka Server", topicname)
disquery = (
df_kafka.writeStream.format("kafka")
.option("kafka.bootstrap.servers", broker_list)
Expand All @@ -239,20 +185,27 @@ def main():
.start()
)

time_spent_in_wait, stream_distrib_list = launch_fink_mm(spark, args)
if args.noscience:
logger.info("Do not perform multi-messenger operations")
time_spent_in_wait, stream_distrib_list = 0, None
else:
logger.debug("Perform multi-messenger operations")
from fink_broker.mm_utils import distribute_launch_fink_mm

time_spent_in_wait, stream_distrib_list = distribute_launch_fink_mm(spark, args)

# Keep the Streaming running until something or someone ends it!
if args.exit_after is not None:
logger.debug("Keep the Streaming running until something or someone ends it!")
remaining_time = args.exit_after - time_spent_in_wait
remaining_time = remaining_time if remaining_time > 0 else 0
time.sleep(remaining_time)
disquery.stop()
if stream_distrib_list != []:
for stream in stream_distrib_list:
stream.stop()
_LOG.info("Exiting the distribute service normally...")
logger.info("Exiting the distribute service normally...")
else:
# Wait for the end of queries
logger.debug("Wait for the end of queries")
spark.streams.awaitAnyTermination()


Expand Down
Loading

0 comments on commit c681f64

Please sign in to comment.