From f5b47fc6475595b70911257e4774c62a654ba80a Mon Sep 17 00:00:00 2001 From: Fabrice Jammes Date: Fri, 30 Aug 2024 16:45:56 +0200 Subject: [PATCH 1/5] Add script to run the whole test from scratch - Add github action dispatch job - Add line_profiler as fink-science dep - Import science module only if needed - Move fink-science dep to science requirements - Set py4j log level to INFO - Add logging messages for raw2science and distribute - Remove useless fink wait script --- .ciux | 2 +- .github/workflows/e2e-common.yml | 13 ++- bin/distribute.py | 97 ++++++---------------- bin/raw2science.py | 137 +++++++++++-------------------- bin/stream2raw.py | 6 +- deps/requirements-science.txt | 8 ++ deps/requirements.txt | 1 - doc/troubleshoot.md | 5 +- e2e/check-results.sh | 8 +- e2e/run.sh | 118 ++++++++++++++++++++++++++ e2e/wait-fink.sh | 85 ------------------- fink_broker/logging_utils.py | 4 + fink_broker/mm_utils.py | 125 ++++++++++++++++++++++++++-- fink_broker/science.py | 58 ++++++------- fink_broker/spark_utils.py | 7 +- 15 files changed, 367 insertions(+), 307 deletions(-) create mode 100755 e2e/run.sh delete mode 100755 e2e/wait-fink.sh diff --git a/.ciux b/.ciux index 1f940310..995be4ad 100644 --- a/.ciux +++ b/.ciux @@ -31,7 +31,7 @@ dependencies: - image: gitlab-registry.in2p3.fr/astrolabsoftware/fink/spark-py:k8s-3.4.1 labels: build: "true" - - package: github.com/k8s-school/ktbx@v1.1.3-rc7 + - package: github.com/k8s-school/ktbx@v1.1.3-rc9 labels: itest: "optional" - package: github.com/astrolabsoftware/finkctl/v3@v3.1.3-rc1 diff --git a/.github/workflows/e2e-common.yml b/.github/workflows/e2e-common.yml index b7b0b91b..2e23de3c 100644 --- a/.github/workflows/e2e-common.yml +++ b/.github/workflows/e2e-common.yml @@ -89,10 +89,6 @@ jobs: uses: actions/checkout@v3 with: fetch-depth: 0 - # - name: Setup tmate session - # uses: mxschmitt/action-tmate@v3 - # with: - # detached: true - name: Maximize build space run: | echo "Removing unwanted software... " @@ -131,7 +127,7 @@ jobs: run: | if [ -f artifacts/image.tar ]; then echo "Loading image from archive" - cluster_name=$(ciux get cluster-name $PWD) + cluster_name=$(ciux get clustername $PWD) kind load image-archive artifacts/image.tar --name "$cluster_name" node=$(kubectl get nodes --selector=node-role.kubernetes.io/control-plane -o jsonpath='{.items[0].metadata.name}') docker exec -- $node crictl image @@ -141,9 +137,10 @@ jobs: - name: Run argoCD run: | ./e2e/argocd.sh - - name: Wait fink-broker to be up and running - run: | - ./e2e/wait-fink.sh + - name: Setup tmate session + uses: mxschmitt/action-tmate@v3 + with: + detached: true - name: Check results run: | ./e2e/check-results.sh diff --git a/bin/distribute.py b/bin/distribute.py index 84d83c94..df8210e7 100644 --- a/bin/distribute.py +++ b/bin/distribute.py @@ -35,9 +35,8 @@ path_exist, ) from fink_broker.distribution_utils import get_kafka_df - +from fink_broker.logging_utils import init_logger from fink_utils.spark.utils import concat_col - from fink_utils.spark.utils import apply_user_defined_filter @@ -60,69 +59,13 @@ ] -def launch_fink_mm(spark, args: dict): - """Manage multimessenger operations - - Parameters - ---------- - spark: SparkSession - Spark Session - args: dict - Arguments from Fink configuration file - - Returns - ------- - time_spent_in_wait: int - Time spent in waiting for GCN to come - before launching the streaming query. - stream_distrib_list: list of StreamingQuery - List of Spark Streaming queries - - """ - if args.noscience: - _LOG.info("No science: fink-mm is not applied") - return 0, [] - elif args.mmconfigpath != "no-config": - from fink_mm.init import get_config - from fink_broker.mm_utils import mm2distribute - - _LOG.info("Fink-MM configuration file: {args.mmconfigpath}") - config = get_config({"--config": args.mmconfigpath}) - - # Wait for GCN comming - time_spent_in_wait = 0 - stream_distrib_list = [] - while time_spent_in_wait < args.exit_after: - mm_path_output = config["PATH"]["online_grb_data_prefix"] - mmtmpdatapath = os.path.join(mm_path_output, "online") - - # if there is gcn and ztf data - if path_exist(mmtmpdatapath): - t_before = time.time() - _LOG.info("starting mm2distribute ...") - stream_distrib_list = mm2distribute(spark, config, args) - time_spent_in_wait += time.time() - t_before - break - - time_spent_in_wait += 1 - time.sleep(1.0) - if stream_distrib_list == []: - _LOG.warning( - f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job." - ) - else: - _LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds") - return time_spent_in_wait, stream_distrib_list - - _LOG.warning("No configuration found for fink-mm -- not applied") - return 0, [] - - def main(): parser = argparse.ArgumentParser(description=__doc__) args = getargs(parser) - # Initialise Spark session + logger = init_logger(args.log_level) + + logger.debug("Initialise Spark session") spark = init_sparksession( name="distribute_{}_{}".format(args.producer, args.night), shuffle_partitions=2, @@ -135,10 +78,10 @@ def main(): args.night ) - # Connect to the TMP science database + logger.debug("Connect to the TMP science database") df = connect_to_raw_database(scitmpdatapath, scitmpdatapath, latestfirst=False) - # Cast fields to ease the distribution + logger.debug("Cast fields to ease the distribution") cnames = df.columns if "brokerEndProcessTimestamp" in cnames: @@ -173,7 +116,7 @@ def main(): "struct(lc_features_r.*) as lc_features_r" ) - # Retrieve time-series information + logger.debug("Retrieve time-series information") to_expand = [ "jd", "fid", @@ -186,7 +129,7 @@ def main(): "diffmaglim", ] - # Append temp columns with historical + current measurements + logger.debug("Append temp columns with historical + current measurements") prefix = "c" for colname in to_expand: df = concat_col(df, colname, prefix=prefix) @@ -206,23 +149,25 @@ def main(): # The topic name is the filter name topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf" - # Apply user-defined filter + if args.noscience: + logger.debug("Do not apply user-defined filter in no-science mode") df_tmp = df else: + logger.debug("Apply user-defined filter") df_tmp = apply_user_defined_filter(df, userfilter, _LOG) - # Wrap alert data + logger.debug("Wrap alert data") df_tmp = df_tmp.selectExpr(cnames) # get schema from the streaming dataframe to # avoid non-nullable bug #852 schema = schema_converter.to_avro(df_tmp.schema) - # Get the DataFrame for publishing to Kafka (avro serialized) + logger.debug("Get the DataFrame for publishing to Kafka (avro serialized)") df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False) - # Ensure that the topic(s) exist on the Kafka Server) + logger.debug("Ensure that the topic(s) exist on the Kafka Server") disquery = ( df_kafka.writeStream.format("kafka") .option("kafka.bootstrap.servers", broker_list) @@ -239,10 +184,16 @@ def main(): .start() ) - time_spent_in_wait, stream_distrib_list = launch_fink_mm(spark, args) + if args.noscience: + logger.info("Do not perform multi-messenger operations") + time_spent_in_wait, stream_distrib_list = 0, None + else: + logger.debug("Perform multi-messenger operations") + from fink_broker.mm_utils import distribute_launch_fink_mm + time_spent_in_wait, stream_distrib_list = distribute_launch_fink_mm(spark, args) - # Keep the Streaming running until something or someone ends it! if args.exit_after is not None: + logger.debug("Keep the Streaming running until something or someone ends it!") remaining_time = args.exit_after - time_spent_in_wait remaining_time = remaining_time if remaining_time > 0 else 0 time.sleep(remaining_time) @@ -250,9 +201,9 @@ def main(): if stream_distrib_list != []: for stream in stream_distrib_list: stream.stop() - _LOG.info("Exiting the distribute service normally...") + logger.info("Exiting the distribute service normally...") else: - # Wait for the end of queries + logger.debug("Wait for the end of queries") spark.streams.awaitAnyTermination() diff --git a/bin/raw2science.py b/bin/raw2science.py index 243706e1..0ba72eb9 100644 --- a/bin/raw2science.py +++ b/bin/raw2science.py @@ -26,84 +26,15 @@ from pyspark.sql import functions as F import argparse -import logging import time import os from fink_broker import __version__ as fbvsn +from fink_broker.logging_utils import init_logger from fink_broker.parser import getargs from fink_broker.spark_utils import init_sparksession from fink_broker.spark_utils import connect_to_raw_database from fink_broker.partitioning import convert_to_datetime, convert_to_millitime -from fink_broker.spark_utils import path_exist - -from fink_broker.science import apply_science_modules -from fink_broker.science import apply_science_modules_elasticc - -from fink_science import __version__ as fsvsn - -_LOG = logging.getLogger(__name__) - - -def launch_fink_mm(args: dict, scitmpdatapath: str): - """Manage multimessenger operations - - Parameters - ---------- - args: dict - Arguments from Fink configuration file - scitmpdatapath: str - Path to Fink alert data (science) - - Returns - ------- - time_spent_in_wait: int - Time spent in waiting for GCN to come - before launching the streaming query. - countquery_mm: StreamingQuery - Spark Streaming query - - """ - if args.noscience: - _LOG.info("No science: fink-mm is not applied") - return 0, None - elif args.mmconfigpath != "no-config": - from fink_mm.init import get_config - from fink_broker.mm_utils import science2mm - - _LOG.info("Fink-MM configuration file: {args.mmconfigpath}") - config = get_config({"--config": args.mmconfigpath}) - gcndatapath = config["PATH"]["online_gcn_data_prefix"] - gcn_path = gcndatapath + "/year={}/month={}/day={}".format( - args.night[0:4], args.night[4:6], args.night[6:8] - ) - - # Wait for GCN comming - time_spent_in_wait = 0 - countquery_mm = None - while time_spent_in_wait < args.exit_after: - # if there is gcn and ztf data - if path_exist(gcn_path) and path_exist(scitmpdatapath): - # Start the GCN x ZTF cross-match stream - t_before = time.time() - _LOG.info("starting science2mm ...") - countquery_mm = science2mm(args, config, gcn_path, scitmpdatapath) - time_spent_in_wait += time.time() - t_before - break - else: - # wait for comming GCN - time_spent_in_wait += 1 - time.sleep(1) - - if countquery_mm is None: - _LOG.warning("science2mm could not start before the end of the job.") - else: - _LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds") - return time_spent_in_wait, countquery_mm - - _LOG.warning("No configuration found for fink-mm -- not applied") - return 0, None - def main(): parser = argparse.ArgumentParser(description=__doc__) @@ -114,7 +45,9 @@ def main(): else: tz = None - # Initialise Spark session + logger = init_logger(args.log_level) + + logger.debug("Initialise Spark session") spark = init_sparksession( name="raw2science_{}_{}".format(args.producer, args.night), shuffle_partitions=2, @@ -148,33 +81,43 @@ def main(): ) # Add library versions + if args.noscience: + fsvsn = "no-science" + else: + # Do not import fink_science if --noscience is set + from fink_science import __version__ as fsvsn + + df = df.withColumn("fink_broker_version", F.lit(fbvsn)).withColumn( "fink_science_version", F.lit(fsvsn) ) - # Switch publisher + logger.debug("Switch publisher") df = df.withColumn("publisher", F.lit("Fink")) - # Apply science modules + logger.debug("Prepare and analyse the data") if "candidate" in df.columns: - # Apply quality cuts - _LOG.info("Applying quality cuts") + logger.info("Apply quality cuts") df = df.filter(df["candidate.nbad"] == 0).filter(df["candidate.rb"] >= 0.55) - # Discard an alert if it is in i band + logger.debug("Discard an alert if it is in i band") df = df.filter(df["candidate.fid"] != 3) - # Apply science modules - _LOG.info("Applying science modules") - df = apply_science_modules(df, args.noscience) + if args.noscience: + logger.info("Do not apply science modules") + else: + # Do not import fink_science if --noscience is set + from fink_broker.science import apply_science_modules + logger.info("Apply science modules") + df = apply_science_modules(df) - # Add ingestion timestamp + logger.debug("Add ingestion timestamp") df = df.withColumn( "brokerEndProcessTimestamp", convert_to_millitime(df["candidate.jd"], F.lit("jd"), F.lit(True)), ) - # Append new rows in the tmp science database + logger.debug("Append new rows in the tmp science database") countquery_science = ( df.writeStream.outputMode("append") .format("parquet") @@ -184,11 +127,17 @@ def main(): .start() ) - # Perform multi-messenger operations - time_spent_in_wait, countquery_mm = launch_fink_mm(args, scitmpdatapath) + if args.noscience: + logger.info("Do not perform multi-messenger operations") + time_spent_in_wait, countquery_mm = 0, None + else: + logger.debug("Perform multi-messenger operations") + from fink_broker.mm_utils import raw2science_launch_fink_mm + time_spent_in_wait, countquery_mm = raw2science_launch_fink_mm(args, scitmpdatapath) + - # Keep the Streaming running until something or someone ends it! if args.exit_after is not None: + logger.debug("Keep the Streaming running until something or someone ends it!") # If GCN arrived, wait for the remaining time since the launch of raw2science remaining_time = args.exit_after - time_spent_in_wait remaining_time = remaining_time if remaining_time > 0 else 0 @@ -197,15 +146,21 @@ def main(): if countquery_mm is not None: countquery_mm.stop() else: - # Wait for the end of queries + logger.debug("Wait for the end of queries") spark.streams.awaitAnyTermination() elif "diaSource" in df.columns: - df = apply_science_modules_elasticc(df) + if args.noscience: + logger.fatal("Elasticc data cannot be processed without science modules") + else: + from fink_broker.science import apply_science_modules_elasticc + logger.info("Apply elasticc science modules") + df = apply_science_modules_elasticc(df) + timecol = "diaSource.midPointTai" converter = lambda x: convert_to_datetime(x, F.lit("mjd")) - # re-create partitioning columns if needed. + logger.debug("Re-create partitioning columns if needed") if "timestamp" not in df.columns: df = df.withColumn("timestamp", converter(df[timecol])) @@ -218,7 +173,7 @@ def main(): if "day" not in df.columns: df = df.withColumn("day", F.date_format("timestamp", "dd")) - # Append new rows in the tmp science database + logger.debug("Append new rows in the tmp science database") countquery = ( df.writeStream.outputMode("append") .format("parquet") @@ -229,15 +184,15 @@ def main(): .start() ) - # Keep the Streaming running until something or someone ends it! if args.exit_after is not None: + logger.debug("Keep the Streaming running until something or someone ends it!") time.sleep(args.exit_after) countquery.stop() else: - # Wait for the end of queries + logger.debug("Wait for the end of queries") spark.streams.awaitAnyTermination() - _LOG.info("Exiting the raw2science service normally...") + logger.info("Exiting the raw2science service normally...") if __name__ == "__main__": diff --git a/bin/stream2raw.py b/bin/stream2raw.py index 4213447b..c2c35121 100644 --- a/bin/stream2raw.py +++ b/bin/stream2raw.py @@ -56,7 +56,9 @@ def main(): # FIXME args.log_level should be checked to be both compliant with python and spark! - # Initialise Spark session + logger = init_logger(args.log_level) + + logger.debug("Initialise Spark session") spark = init_sparksession( name="stream2raw_{}_{}".format(args.producer, args.night), shuffle_partitions=2, @@ -64,8 +66,6 @@ def main(): log_level=args.spark_log_level, ) - logger = init_logger(args.log_level) - # debug statements inspect_application(logger) diff --git a/deps/requirements-science.txt b/deps/requirements-science.txt index 4c7582c5..26c1cf6a 100644 --- a/deps/requirements-science.txt +++ b/deps/requirements-science.txt @@ -1,3 +1,11 @@ +# Fink-science dependencies +# WARNING: fink-science pip module is deprecated and fink-science is now installed from source + +git+https://github.com/astrolabsoftware/fink-science@5.15.0 + +# xmatch_cds +line_profiler==4.1.3 + # Active learning -e git+https://github.com/emilleishida/fink_sn_activelearning.git@4f46b3a1e29de45793125452974e71e92c1ea454#egg=actsnfink -e git+https://github.com/COINtoolbox/ActSNClass.git@2c61da91a9d13834d39804fc35aeb3245ba20755#egg=actsnclass diff --git a/deps/requirements.txt b/deps/requirements.txt index 06d75353..74a41919 100644 --- a/deps/requirements.txt +++ b/deps/requirements.txt @@ -19,7 +19,6 @@ fastavro==1.6.0 # Fink core fink_filters>=3.33 -git+https://github.com/astrolabsoftware/fink-science@5.15.0 fink-utils>=0.19.0 fink-spins>=0.3.7 fink-tns>=0.9 diff --git a/doc/troubleshoot.md b/doc/troubleshoot.md index 9b078d10..1a779684 100644 --- a/doc/troubleshoot.md +++ b/doc/troubleshoot.md @@ -12,5 +12,6 @@ kubectl run -it --rm s5cmd --image=peakcom/s5cmd --env AWS_ACCESS_KEY_ID=minio - Interactive access: ```shell kubectl run -it --rm s5cmd --image=peakcom/s5cmd --env AWS_ACCESS_KEY_ID=minio --env AWS_SECRET_ACCESS_KEY=minio123 --env S3_ENDPOINT_URL=https://minio.minio:443 --command -- sh -/s5cmd --log debug --no-verify-ssl ls -H "s3://fink-broker-online/raw/year=2020/month=01/day=01/" -``` \ No newline at end of file +/s5cmd --log debug --no-verify-ssl ls -H "s3://fink-broker-online/raw/2020/01/01/" +/s5cmd --log debug --no-verify-ssl ls "s3://fink-broker-online/*" +``` diff --git a/e2e/check-results.sh b/e2e/check-results.sh index 2fa2b03b..a9fb31a7 100755 --- a/e2e/check-results.sh +++ b/e2e/check-results.sh @@ -38,8 +38,8 @@ while ! finkctl wait topics --expected "$expected_topics" --timeout 60s -v1 do echo "Waiting for expected topics: $expected_topics" sleep 5 - kubectl get pods - if [ $(kubectl get pods -l app.kubernetes.io/instance=fink-broker --field-selector=status.phase!=Running | wc -l) -ge 1 ]; + kubectl get pods -n spark + if [ $(kubectl get pods -n spark -l app.kubernetes.io/instance=fink-broker --field-selector=status.phase!=Running | wc -l) -ge 1 ]; then echo "ERROR: fink-broker has crashed" 1>&2 echo "ERROR: enabling interactive access for debugging purpose" 1>&2 @@ -50,6 +50,10 @@ do if [ $count -eq 10 ]; then echo "ERROR: Timeout waiting for topics to be created" 1>&2 kubectl logs -l sparkoperator.k8s.io/launched-by-spark-operator=true --tail -1 + echo "PODS" + kubectl get pods -A + echo "KAFKA TOPICS" + kubectl get kafkatopics -A sleep 7200 exit 1 fi diff --git a/e2e/run.sh b/e2e/run.sh new file mode 100755 index 00000000..08642d9f --- /dev/null +++ b/e2e/run.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +# Run fink-broker e2e tests + +# @author Fabrice Jammes + +set -euxo pipefail + +DIR=$(cd "$(dirname "$0")"; pwd -P) + +usage () { + echo "Usage: $0 [-s]" + echo " -s: Use the science algorithms during the tests" + echo " -c: Cleanup the cluster after the tests" + exit 1 +} + +SUFFIX="noscience" + +token="${TOKEN:-}" + +# Get options for suffix +while getopts hcs opt; do + case ${opt} in + s ) + SUFFIX="" + ;; + c ) + cleanup=true + ;; + h ) + usage + exit 0 + ;; + \? ) + usage + exit 1 + ;; + esac +done + +export SUFFIX +export CIUXCONFIG=$HOME/.ciux/ciux.sh + +cleanup=false +build=false +e2e=false +push=false + +{ +echo "Update source code" +cd $DIR/.. +git pull + +ciux_version=v0.0.4-rc8 +go install github.com/k8s-school/ciux@"$ciux_version" + +echo "Ignite the project using ciux" +mkdir -p ~/.ciux + +# Build step +$DIR/../build.sh -s "$SUFFIX" +build=true + +# e2e tests step +ciux ignite --selector itest $PWD --suffix "$SUFFIX" + +cluster=$(ciux get clustername $DIR/..) +echo "Delete the cluster $cluster if it already exists" +ktbx delete --name "$cluster" || true + +echo "Create a Kubernetes cluster (Kind), Install OLM and ArgoCD operators." +$DIR/prereq-install.sh + +. $CIUXCONFIG +if [ $CIUX_BUILD = true ]; then + kind load docker-image $CIUX_IMAGE_URL --name "$cluster" +fi + +echo "Run ArgoCD to install the whole fink e2e tests stack" +$DIR/argocd.sh + +echo "Check the results of the tests." +$DIR/check-results.sh +e2e=true + +echo "Push the image to Container Registry" +$DIR/../push-image.sh +push=true +} + +url="https://api.github.com/repos/astrolabsoftware/fink-broker/dispatches" + +payload="{\"build\": $build,\"e2e\": $e2e,\"push\": $push, \"cluster\": \"$cluster\", \"image\": \"$CIUX_IMAGE_URL\"}" +echo "Payload: $payload" + +if [ -z "$token" ]; then + echo "No token provided, skipping GitHub dispatch" +else + echo "Dispatching event to GitHub" + curl -L \ + -X POST \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer $token" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + $url \ + -d "{\"event_type\":\"e2e-science\",\"client_payload\":$payload}" || echo "ERROR Failed to dispatch event" >&2 +fi + +if [ $cleanup = true -a $e2e = true ]; then + echo "Delete the cluster $cluster" + ktbx delete --name "$cluster" +else + echo "Cluster $cluster kept for debugging" +fi + + + diff --git a/e2e/wait-fink.sh b/e2e/wait-fink.sh deleted file mode 100755 index c2d811bd..00000000 --- a/e2e/wait-fink.sh +++ /dev/null @@ -1,85 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Launch integration tests for fink-broker - -# @author Fabrice Jammes - -set -euxo pipefail - -DIR=$(cd "$(dirname "$0")"; pwd -P) - -CIUXCONFIG=${CIUXCONFIG:-"$HOME/.ciux/ciux.sh"} -. $CIUXCONFIG - -usage() { - echo "Usage: $0 [-e] [-f finkconfig] [-i image]" - echo " Run end-to-end tests, fink configuration file and fink-broker image are automatically set" - echo " -h: display this help" -} - -# Parse option for finkctl configuration file and fink-broker image -while getopts "h" opt; do - case $opt in - h) usage ; exit 0 ;; - esac -done - -NS=spark - - - -kubectl config set-context --current --namespace="$NS" - -# Wait for Spark pods to be created and warm up -# Debug in case of not expected behaviour -# Science setup is VERY slow to start, because of raw2science-exec pod -# TODO use helm --wait with a pod which monitor the status of the Spark pods? - -# 5 minutes timeout -timeout="300" - -counter=0 -max_retries=3 -# Sometimes spark pods crashes and finktctl wait may fail -# even if Spark pod will be running after a while -# TODO implement the retry in "finkctl wait" -while ! finkctl wait tasks --timeout="${timeout}s"; do - if [ $counter -gt $max_retries ]; then - echo "ERROR: unable to start fink-broker in $timeout" - echo "ERROR: enabling interactive access for debugging purpose" - sleep 7200 - exit 1 - fi - echo "Spark applications" - echo "---------------" - kubectl get sparkapplications - kubectl logs -n spark-operator -l app.kubernetes.io/instance=spark-operator - echo "Pods description" - echo "----------------" - kubectl describe pods -l "spark-role in (executor, driver)" - kubectl get pods - echo "ERROR: Spark pods are not running after $timeout, retry $counter/$max_retries" - sleep 60 - timeout=$((timeout+300)) - counter=$((counter+1)) -done - -kubectl describe pods -l "spark-role in (executor, driver)" -kubectl get pods -echo "SUCCESS: fink-broker is running" diff --git a/fink_broker/logging_utils.py b/fink_broker/logging_utils.py index 73223073..c686dfa8 100644 --- a/fink_broker/logging_utils.py +++ b/fink_broker/logging_utils.py @@ -47,6 +47,7 @@ def init_logger(log_level: str = "INFO") -> logging.Logger: # Format of the log message to be printed FORMAT = "%(asctime)-15s " FORMAT += "%(levelname)s " + FORMAT += "%(name)s " FORMAT += "%(funcName)s " FORMAT += "(%(filename)s " FORMAT += "line %(lineno)d): " @@ -65,6 +66,9 @@ def init_logger(log_level: str = "INFO") -> logging.Logger: streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) + # TODO use a configuration file in a configmap to set the log level of the py4j logger + logging.getLogger('py4j').setLevel(logging.INFO) + return logger diff --git a/fink_broker/mm_utils.py b/fink_broker/mm_utils.py index 919d515a..a99a9c60 100644 --- a/fink_broker/mm_utils.py +++ b/fink_broker/mm_utils.py @@ -14,21 +14,136 @@ # limitations under the License. """Utilities for multi-messenger activities""" +import argparse +import configparser +import logging +import os +import time +from typing import Tuple, List + from fink_mm.ztf_join_gcn import ztf_join_gcn_stream, DataMode from fink_mm.distribution.distribution import grb_distribution_stream from astropy.time import Time from datetime import timedelta -from fink_broker.spark_utils import connect_to_raw_database +from fink_broker.spark_utils import connect_to_raw_database, path_exist from fink_broker.logging_utils import init_logger from pyspark.sql.streaming import StreamingQuery -import argparse -import configparser -import os -import time +_LOG = logging.getLogger(__name__) + + +def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]: + """Manage multimessenger operations + + Parameters + ---------- + spark: SparkSession + Spark Session + args: dict + Arguments from Fink configuration file + + Returns + ------- + time_spent_in_wait: int + Time spent in waiting for GCN to come + before launching the streaming query. + stream_distrib_list: list of StreamingQuery + List of Spark Streaming queries + + """ + if args.mmconfigpath != "no-config": + from fink_mm.init import get_config + from fink_broker.mm_utils import mm2distribute + + _LOG.info("Fink-MM configuration file: {args.mmconfigpath}") + config = get_config({"--config": args.mmconfigpath}) + + # Wait for GCN comming + time_spent_in_wait = 0 + stream_distrib_list = [] + while time_spent_in_wait < args.exit_after: + mm_path_output = config["PATH"]["online_grb_data_prefix"] + mmtmpdatapath = os.path.join(mm_path_output, "online") + + # if there is gcn and ztf data + if path_exist(mmtmpdatapath): + t_before = time.time() + _LOG.info("starting mm2distribute ...") + stream_distrib_list = mm2distribute(spark, config, args) + time_spent_in_wait += time.time() - t_before + break + + time_spent_in_wait += 1 + time.sleep(1.0) + if stream_distrib_list == []: + _LOG.warning( + f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job." + ) + else: + _LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds") + return time_spent_in_wait, stream_distrib_list + + _LOG.warning("No configuration found for fink-mm -- not applied") + return 0, [] + +def raw2science_launch_fink_mm(args: dict, scitmpdatapath: str) -> Tuple[int, StreamingQuery]: + """Manage multimessenger operations + + Parameters + ---------- + args: dict + Arguments from Fink configuration file + scitmpdatapath: str + Path to Fink alert data (science) + + Returns + ------- + time_spent_in_wait: int + Time spent in waiting for GCN to come + before launching the streaming query. + countquery_mm: StreamingQuery + Spark Streaming query + + """ + if args.mmconfigpath != "no-config": + from fink_mm.init import get_config + from fink_broker.mm_utils import science2mm + + _LOG.info("Fink-MM configuration file: {args.mmconfigpath}") + config = get_config({"--config": args.mmconfigpath}) + gcndatapath = config["PATH"]["online_gcn_data_prefix"] + gcn_path = gcndatapath + "/year={}/month={}/day={}".format( + args.night[0:4], args.night[4:6], args.night[6:8] + ) + + # Wait for GCN comming + time_spent_in_wait = 0 + countquery_mm = None + while time_spent_in_wait < args.exit_after: + # if there is gcn and ztf data + if path_exist(gcn_path) and path_exist(scitmpdatapath): + # Start the GCN x ZTF cross-match stream + t_before = time.time() + _LOG.info("starting science2mm ...") + countquery_mm = science2mm(args, config, gcn_path, scitmpdatapath) + time_spent_in_wait += time.time() - t_before + break + else: + # wait for comming GCN + time_spent_in_wait += 1 + time.sleep(1) + + if countquery_mm is None: + _LOG.warning("science2mm could not start before the end of the job.") + else: + _LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds") + return time_spent_in_wait, countquery_mm + + _LOG.warning("No configuration found for fink-mm -- not applied") + return 0, None def science2mm( diff --git a/fink_broker/science.py b/fink_broker/science.py index c23ab077..28b9660e 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -29,42 +29,34 @@ from fink_broker.tester import spark_unit_tests +# Import of science modules +from fink_science.random_forest_snia.processor import rfscore_sigmoid_full +from fink_science.xmatch.processor import xmatch_cds +from fink_science.xmatch.processor import crossmatch_other_catalog +from fink_science.xmatch.processor import crossmatch_mangrove + +from fink_science.snn.processor import snn_ia +from fink_science.microlensing.processor import mulens +from fink_science.asteroids.processor import roid_catcher +from fink_science.nalerthist.processor import nalerthist +from fink_science.kilonova.processor import knscore +from fink_science.ad_features.processor import extract_features_ad +from fink_science.anomaly_detection.processor import anomaly_score + +from fink_science.random_forest_snia.processor import rfscore_rainbow_elasticc +from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc +from fink_science.cats.processor import predict_nn +from fink_science.agn.processor import agn_elasticc +from fink_science.slsn.processor import slsn_elasticc +from fink_science.fast_transient_rate.processor import magnitude_rate +from fink_science.fast_transient_rate import rate_module_output_schema +# from fink_science.t2.processor import t2 # --------------------------------- # Local non-exported definitions -- # --------------------------------- _LOG = logging.getLogger(__name__) - -# Conditional import of science modules -try: - from fink_science.random_forest_snia.processor import rfscore_sigmoid_full - from fink_science.xmatch.processor import xmatch_cds - from fink_science.xmatch.processor import crossmatch_other_catalog - from fink_science.xmatch.processor import crossmatch_mangrove - - from fink_science.snn.processor import snn_ia - from fink_science.microlensing.processor import mulens - from fink_science.asteroids.processor import roid_catcher - from fink_science.nalerthist.processor import nalerthist - from fink_science.kilonova.processor import knscore - from fink_science.ad_features.processor import extract_features_ad - from fink_science.anomaly_detection.processor import anomaly_score - - from fink_science.random_forest_snia.processor import rfscore_rainbow_elasticc - from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc - from fink_science.cats.processor import predict_nn - from fink_science.agn.processor import agn_elasticc - from fink_science.slsn.processor import slsn_elasticc - from fink_science.fast_transient_rate.processor import magnitude_rate - from fink_science.fast_transient_rate import rate_module_output_schema - # from fink_science.t2.processor import t2 -except ImportError as e: - _LOG.warning("Fink science modules are not available. ") - _LOG.warning(f"exception raised: {e}") - pass - - def dec2theta(dec: float) -> float: """Convert Dec (deg) to theta (rad)""" return np.pi / 2.0 - np.pi / 180.0 * dec @@ -74,7 +66,6 @@ def ra2phi(ra: float) -> float: """Convert RA (deg) to phi (rad)""" return np.pi / 180.0 * ra - @pandas_udf(LongType(), PandasUDFType.SCALAR) def ang2pix(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series: """Compute pixel number at given nside @@ -183,7 +174,7 @@ def fake_t2(incol): return pd.Series([out] * len(incol)) -def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: +def apply_science_modules(df: DataFrame) -> DataFrame: """Load and apply Fink science modules to enrich alert content Focus on ZTF stream @@ -212,9 +203,6 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: # apply_science_modules is lazy, so trigger the computation >>> an_alert = df.take(1) """ - if noscience: - return df - # Retrieve time-series information to_expand = [ "jd", diff --git a/fink_broker/spark_utils.py b/fink_broker/spark_utils.py index 189e5f34..e6978554 100644 --- a/fink_broker/spark_utils.py +++ b/fink_broker/spark_utils.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import time +import logging from typing import Tuple from pyspark import SparkContext from pyspark.sql import SparkSession @@ -26,6 +27,10 @@ from fink_broker.avro_utils import readschemafromavrofile from fink_broker.tester import spark_unit_tests +# --------------------------------- +# Local non-exported definitions -- +# --------------------------------- +_LOG = logging.getLogger(__name__) def from_avro(dfcol: Column, jsonformatschema: str) -> Column: """Decode the Avro data contained in a DataFrame column into a struct. @@ -324,7 +329,7 @@ def connect_to_raw_database(basepath: str, path: str, latestfirst: bool) -> Data wait_sec = 5 while not path_exist(basepath): - print("Waiting for data to arrive in {}".format(basepath)) + _LOG.info("Waiting for stream2raw to upload data to %s", basepath) time.sleep(wait_sec) # Sleep for longer and longer wait_sec = increase_wait_time(wait_sec) From 673cac153c20de1a6710cdc6267a20c35d7ad6b7 Mon Sep 17 00:00:00 2001 From: Fabrice Jammes Date: Wed, 11 Sep 2024 10:50:51 +0200 Subject: [PATCH 2/5] Update GHA artifact actions --- .github/workflows/e2e-common.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/e2e-common.yml b/.github/workflows/e2e-common.yml index 2e23de3c..78342814 100644 --- a/.github/workflows/e2e-common.yml +++ b/.github/workflows/e2e-common.yml @@ -73,7 +73,7 @@ jobs: touch artifacts/empty fi echo "IMAGE=$CIUX_IMAGE_URL" >> "$GITHUB_OUTPUT" - - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v4 with: name: docker-artifact path: artifacts @@ -119,7 +119,7 @@ jobs: # v0.20.0 does not work on self-hosted runners ./e2e/prereq-install.sh -k "${{ inputs.kind_version }}" - name: Download image - uses: actions/download-artifact@v4.1.7 + uses: actions/download-artifact@v4 with: name: docker-artifact path: artifacts @@ -160,7 +160,7 @@ jobs: - name: Checkout code uses: actions/checkout@v2 - name: Download image - uses: actions/download-artifact@v4.1.7 + uses: actions/download-artifact@v4 with: name: docker-artifact path: artifacts @@ -195,7 +195,7 @@ jobs: needs: [build, integration-tests] steps: - name: Download image - uses: actions/download-artifact@v4.1.7 + uses: actions/download-artifact@v4 with: name: docker-artifact path: artifacts From dc5d3cf5c6d1bfdd1dd10045163b04961a572ded Mon Sep 17 00:00:00 2001 From: Fabrice Jammes Date: Fri, 13 Sep 2024 11:19:56 +0200 Subject: [PATCH 3/5] Document self-hosted CI --- bin/distribute.py | 9 +++-- doc/debug_self_hosted.md | 25 -------------- doc/self_hosted.md | 75 ++++++++++++++++++++++++++++++++++++++++ e2e/run.sh | 3 -- 4 files changed, 79 insertions(+), 33 deletions(-) delete mode 100644 doc/debug_self_hosted.md create mode 100644 doc/self_hosted.md diff --git a/bin/distribute.py b/bin/distribute.py index df8210e7..13dbc3f4 100644 --- a/bin/distribute.py +++ b/bin/distribute.py @@ -149,12 +149,11 @@ def main(): # The topic name is the filter name topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf" - if args.noscience: - logger.debug("Do not apply user-defined filter in no-science mode") + logger.debug("Do not apply user-defined filter %s in no-science mode", userfilter) df_tmp = df else: - logger.debug("Apply user-defined filter") + logger.debug("Apply user-defined filter %s", userfilter) df_tmp = apply_user_defined_filter(df, userfilter, _LOG) logger.debug("Wrap alert data") @@ -164,10 +163,10 @@ def main(): # avoid non-nullable bug #852 schema = schema_converter.to_avro(df_tmp.schema) - logger.debug("Get the DataFrame for publishing to Kafka (avro serialized)") + logger.debug("Get the DataFrame for publishing to Kafka (avro serialized): %s", df_tmp) df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False) - logger.debug("Ensure that the topic(s) exist on the Kafka Server") + logger.debug("Ensure that the topic '%s' exist on the Kafka Server", topicname) disquery = ( df_kafka.writeStream.format("kafka") .option("kafka.bootstrap.servers", broker_list) diff --git a/doc/debug_self_hosted.md b/doc/debug_self_hosted.md deleted file mode 100644 index bc9028b5..00000000 --- a/doc/debug_self_hosted.md +++ /dev/null @@ -1,25 +0,0 @@ -# Debug self-hosted runner - -```shell -# Connect to k8s cluster -export KUBECONFIG="$HOME/.kube/self-hosted-ci_v4" - -# Open a shell on the runner, the runner name is available on top of github action log for the action you want to debug -kubectl exec -it -n arc-runners arc-runners-25rrh-runner-kb87p bash - -# Set environment in the pod -export CIUXCONFIG=/tmp/ciux.sh -# SUFFIX is equal to empty or noscience -export SUFFIX="" -export PATH=/home/runner/go/bin/:$PATH -sudo apt-get update -y -sudo apt-get install -y bash-completion -echo 'source <(kubectl completion bash)' >>~/.bashrc -export SRC=/home/runner/_work/fink-broker/fink-broker - -# Access source code: -ls /home/runner/_work/fink-broker/fink-broker - -# Access fink pod -kubectl get pods -``` diff --git a/doc/self_hosted.md b/doc/self_hosted.md new file mode 100644 index 00000000..9fa91af5 --- /dev/null +++ b/doc/self_hosted.md @@ -0,0 +1,75 @@ +# Set up fink self-hosted CI + +Here's a documented procedure for creating a Linux user account on Ubuntu, adding it to the Docker group, cloning a Git repository inside a script (`run_script.sh`), and running that script nightly with a cron job, using a token stored in the user’s home directory. + +## Pre-requisites + +- a Linux server with sudo access +- Docker +- git +- A Github token with the "Content" permission on the `fink-broker` Github repository. + +## Steps + +### List of commands + +```bash +# Create user +sudo adduser fink-ci --disabled-password + +# Add to Docker group +sudo usermod -aG docker fink-ci + +sudo su - fink-ci + +# Store GitHub token securely +echo "your_github_token" > /home/fink-ci/.token +chmod 600 /home/fink-ci/.token + +cat <> /home/fink-ci/fink-ci.sh +#!/bin/bash + +set -euxo pipefail + +# Load GitHub token +TOKEN=$(cat /home/fink-ci/.token) + +REPO_URL="https://github.com/astrolabsoftware/fink-broker.git" + +REPO=/home/fink-ci/fink-broker + +# Clone the repository or pull the latest changes +if [ ! -d "\$REPO" ]; then + git clone \$REPO_URL \$REPO +else + cd \$REPO + git -C \$REPO pull +fi + +# Run fink ci in science mode +\$REPO/e2e/run.sh -s +EOF + +# Make the script executable +chmod +x /home/fink-ci/fink-ci.sh +``` + +### Set Up the Cron Job to run nightly +To ensure the CI script runs nightly, set up a cron job. + +1. Open the crontab for the `fink-ci` user: + + ```bash + crontab -e + ``` + +2. Add the following line to schedule the script to run at midnight every day: + + ```bash + 0 0 * * * /home/fink-ci/fink-ci.sh >> /home/fink-ci/cronjob-$(date +\%Y-\%m-\%d).log 2>&1 + ``` + + This will log the output to `/home/fink-ci/cronjob-YYY-MM-DD.log`. + + +By following this procedure, the `fink-ci` user will be set up to automatically run fink e2e tests every night and report it to Github Actions. \ No newline at end of file diff --git a/e2e/run.sh b/e2e/run.sh index 08642d9f..feb8c32c 100755 --- a/e2e/run.sh +++ b/e2e/run.sh @@ -48,9 +48,6 @@ e2e=false push=false { -echo "Update source code" -cd $DIR/.. -git pull ciux_version=v0.0.4-rc8 go install github.com/k8s-school/ciux@"$ciux_version" From 3af6451d5ba7cd868b975ded5c247c8347a952b3 Mon Sep 17 00:00:00 2001 From: Fabrice Jammes Date: Fri, 13 Sep 2024 11:24:39 +0200 Subject: [PATCH 4/5] Format code and implement minor fixes --- bin/distribute.py | 11 +++-- bin/raw2science.py | 20 ++++++--- doc/self_hosted.md | 30 +++++++------- doc/troubleshoot.md | 2 +- e2e/run.sh | 80 ++++++++++++++++++------------------ fink_broker/logging_utils.py | 2 +- fink_broker/mm_utils.py | 5 ++- fink_broker/science.py | 2 + fink_broker/spark_utils.py | 1 + 9 files changed, 86 insertions(+), 67 deletions(-) diff --git a/bin/distribute.py b/bin/distribute.py index 13dbc3f4..06f43eb2 100644 --- a/bin/distribute.py +++ b/bin/distribute.py @@ -25,14 +25,12 @@ import argparse import logging import time -import os from fink_utils.spark import schema_converter from fink_broker.parser import getargs from fink_broker.spark_utils import ( init_sparksession, connect_to_raw_database, - path_exist, ) from fink_broker.distribution_utils import get_kafka_df from fink_broker.logging_utils import init_logger @@ -150,7 +148,9 @@ def main(): topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf" if args.noscience: - logger.debug("Do not apply user-defined filter %s in no-science mode", userfilter) + logger.debug( + "Do not apply user-defined filter %s in no-science mode", userfilter + ) df_tmp = df else: logger.debug("Apply user-defined filter %s", userfilter) @@ -163,7 +163,9 @@ def main(): # avoid non-nullable bug #852 schema = schema_converter.to_avro(df_tmp.schema) - logger.debug("Get the DataFrame for publishing to Kafka (avro serialized): %s", df_tmp) + logger.debug( + "Get the DataFrame for publishing to Kafka (avro serialized): %s", df_tmp + ) df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False) logger.debug("Ensure that the topic '%s' exist on the Kafka Server", topicname) @@ -189,6 +191,7 @@ def main(): else: logger.debug("Perform multi-messenger operations") from fink_broker.mm_utils import distribute_launch_fink_mm + time_spent_in_wait, stream_distrib_list = distribute_launch_fink_mm(spark, args) if args.exit_after is not None: diff --git a/bin/raw2science.py b/bin/raw2science.py index 0ba72eb9..b2ce15fd 100644 --- a/bin/raw2science.py +++ b/bin/raw2science.py @@ -36,6 +36,7 @@ from fink_broker.spark_utils import connect_to_raw_database from fink_broker.partitioning import convert_to_datetime, convert_to_millitime + def main(): parser = argparse.ArgumentParser(description=__doc__) args = getargs(parser) @@ -82,12 +83,11 @@ def main(): # Add library versions if args.noscience: + logger.debug("Do not import fink_science because --noscience is set") fsvsn = "no-science" else: - # Do not import fink_science if --noscience is set from fink_science import __version__ as fsvsn - df = df.withColumn("fink_broker_version", F.lit(fbvsn)).withColumn( "fink_science_version", F.lit(fsvsn) ) @@ -106,8 +106,9 @@ def main(): if args.noscience: logger.info("Do not apply science modules") else: - # Do not import fink_science if --noscience is set + logger.info("Import science modules") from fink_broker.science import apply_science_modules + logger.info("Apply science modules") df = apply_science_modules(df) @@ -133,11 +134,15 @@ def main(): else: logger.debug("Perform multi-messenger operations") from fink_broker.mm_utils import raw2science_launch_fink_mm - time_spent_in_wait, countquery_mm = raw2science_launch_fink_mm(args, scitmpdatapath) + time_spent_in_wait, countquery_mm = raw2science_launch_fink_mm( + args, scitmpdatapath + ) if args.exit_after is not None: - logger.debug("Keep the Streaming running until something or someone ends it!") + logger.debug( + "Keep the Streaming running until something or someone ends it!" + ) # If GCN arrived, wait for the remaining time since the launch of raw2science remaining_time = args.exit_after - time_spent_in_wait remaining_time = remaining_time if remaining_time > 0 else 0 @@ -154,6 +159,7 @@ def main(): logger.fatal("Elasticc data cannot be processed without science modules") else: from fink_broker.science import apply_science_modules_elasticc + logger.info("Apply elasticc science modules") df = apply_science_modules_elasticc(df) @@ -185,7 +191,9 @@ def main(): ) if args.exit_after is not None: - logger.debug("Keep the Streaming running until something or someone ends it!") + logger.debug( + "Keep the Streaming running until something or someone ends it!" + ) time.sleep(args.exit_after) countquery.stop() else: diff --git a/doc/self_hosted.md b/doc/self_hosted.md index 9fa91af5..252482ae 100644 --- a/doc/self_hosted.md +++ b/doc/self_hosted.md @@ -5,8 +5,9 @@ Here's a documented procedure for creating a Linux user account on Ubuntu, addin ## Pre-requisites - a Linux server with sudo access -- Docker -- git +- Docker 24.0.2+ +- git 2.17.1+ +- go 1.22.5+ - A Github token with the "Content" permission on the `fink-broker` Github repository. ## Steps @@ -26,28 +27,27 @@ sudo su - fink-ci echo "your_github_token" > /home/fink-ci/.token chmod 600 /home/fink-ci/.token -cat <> /home/fink-ci/fink-ci.sh +cat < /home/fink-ci/fink-ci.sh #!/bin/bash set -euxo pipefail # Load GitHub token -TOKEN=$(cat /home/fink-ci/.token) +export TOKEN=$(cat /home/fink-ci/.token) +export USER="fink-ci" +repo_url="https://github.com/astrolabsoftware/fink-broker.git" +tmpdir=\$(mktemp -d --suffix -fink-broker-ci) +repo=\$tmpdir/fink-broker +branchname="877-automated-e2e-tests" -REPO_URL="https://github.com/astrolabsoftware/fink-broker.git" +# Set go path according to go install method +PATH=\$HOME/go/bin:/usr/local/go/bin:/usr/local/bin:\$PATH -REPO=/home/fink-ci/fink-broker - -# Clone the repository or pull the latest changes -if [ ! -d "\$REPO" ]; then - git clone \$REPO_URL \$REPO -else - cd \$REPO - git -C \$REPO pull -fi +# Clone the repository +git clone --single-branch \$repo_url \$repo --branch \$branchname # Run fink ci in science mode -\$REPO/e2e/run.sh -s +\$repo/e2e/run.sh -s -c EOF # Make the script executable diff --git a/doc/troubleshoot.md b/doc/troubleshoot.md index 1a779684..202eb295 100644 --- a/doc/troubleshoot.md +++ b/doc/troubleshoot.md @@ -12,6 +12,6 @@ kubectl run -it --rm s5cmd --image=peakcom/s5cmd --env AWS_ACCESS_KEY_ID=minio - Interactive access: ```shell kubectl run -it --rm s5cmd --image=peakcom/s5cmd --env AWS_ACCESS_KEY_ID=minio --env AWS_SECRET_ACCESS_KEY=minio123 --env S3_ENDPOINT_URL=https://minio.minio:443 --command -- sh -/s5cmd --log debug --no-verify-ssl ls -H "s3://fink-broker-online/raw/2020/01/01/" +/s5cmd --log debug --no-verify-ssl ls -H "s3://fink-broker-online/raw/20200101/" /s5cmd --log debug --no-verify-ssl ls "s3://fink-broker-online/*" ``` diff --git a/e2e/run.sh b/e2e/run.sh index feb8c32c..808159e3 100755 --- a/e2e/run.sh +++ b/e2e/run.sh @@ -17,6 +17,15 @@ usage () { SUFFIX="noscience" +ciux_version=v0.0.4-rc8 +export CIUXCONFIG=$HOME/.ciux/ciux.sh + +src_dir=$DIR/.. +cleanup=false +build=false +e2e=false +push=false + token="${TOKEN:-}" # Get options for suffix @@ -40,29 +49,50 @@ while getopts hcs opt; do done export SUFFIX -export CIUXCONFIG=$HOME/.ciux/ciux.sh - -cleanup=false -build=false -e2e=false -push=false +function dispatch() { + url="https://api.github.com/repos/astrolabsoftware/fink-broker/dispatches" + + payload="{\"build\": $build,\"e2e\": $e2e,\"push\": $push, \"cluster\": \"$cluster\", \"image\": \"$CIUX_IMAGE_URL\"}" + echo "Payload: $payload" + + if [ -z "$token" ]; then + echo "No token provided, skipping GitHub dispatch" + else + echo "Dispatching event to GitHub" + curl -L \ + -X POST \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer $token" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + $url \ + -d "{\"event_type\":\"e2e-science\",\"client_payload\":$payload}" || echo "ERROR Failed to dispatch event" >&2 + fi + + if [ $cleanup = true -a $e2e = true ]; then + echo "Delete the cluster $cluster" + ktbx delete --name "$cluster" + else + echo "Cluster $cluster kept for debugging" + fi +} + +trap dispatch EXIT -ciux_version=v0.0.4-rc8 go install github.com/k8s-school/ciux@"$ciux_version" echo "Ignite the project using ciux" mkdir -p ~/.ciux # Build step -$DIR/../build.sh -s "$SUFFIX" +$src_dir/build.sh -s "$SUFFIX" build=true # e2e tests step -ciux ignite --selector itest $PWD --suffix "$SUFFIX" +ciux ignite --selector itest "$src_dir" --suffix "$SUFFIX" -cluster=$(ciux get clustername $DIR/..) +cluster=$(ciux get clustername "$src_dir") echo "Delete the cluster $cluster if it already exists" ktbx delete --name "$cluster" || true @@ -82,34 +112,6 @@ $DIR/check-results.sh e2e=true echo "Push the image to Container Registry" -$DIR/../push-image.sh +$src_dir/push-image.sh push=true -} - -url="https://api.github.com/repos/astrolabsoftware/fink-broker/dispatches" - -payload="{\"build\": $build,\"e2e\": $e2e,\"push\": $push, \"cluster\": \"$cluster\", \"image\": \"$CIUX_IMAGE_URL\"}" -echo "Payload: $payload" - -if [ -z "$token" ]; then - echo "No token provided, skipping GitHub dispatch" -else - echo "Dispatching event to GitHub" - curl -L \ - -X POST \ - -H "Accept: application/vnd.github+json" \ - -H "Authorization: Bearer $token" \ - -H "X-GitHub-Api-Version: 2022-11-28" \ - $url \ - -d "{\"event_type\":\"e2e-science\",\"client_payload\":$payload}" || echo "ERROR Failed to dispatch event" >&2 -fi - -if [ $cleanup = true -a $e2e = true ]; then - echo "Delete the cluster $cluster" - ktbx delete --name "$cluster" -else - echo "Cluster $cluster kept for debugging" -fi - - diff --git a/fink_broker/logging_utils.py b/fink_broker/logging_utils.py index c686dfa8..a4f5615e 100644 --- a/fink_broker/logging_utils.py +++ b/fink_broker/logging_utils.py @@ -67,7 +67,7 @@ def init_logger(log_level: str = "INFO") -> logging.Logger: logger.addHandler(streamHandler) # TODO use a configuration file in a configmap to set the log level of the py4j logger - logging.getLogger('py4j').setLevel(logging.INFO) + logging.getLogger("py4j").setLevel(logging.INFO) return logger diff --git a/fink_broker/mm_utils.py b/fink_broker/mm_utils.py index a99a9c60..22eb76b0 100644 --- a/fink_broker/mm_utils.py +++ b/fink_broker/mm_utils.py @@ -89,7 +89,10 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]: _LOG.warning("No configuration found for fink-mm -- not applied") return 0, [] -def raw2science_launch_fink_mm(args: dict, scitmpdatapath: str) -> Tuple[int, StreamingQuery]: + +def raw2science_launch_fink_mm( + args: dict, scitmpdatapath: str +) -> Tuple[int, StreamingQuery]: """Manage multimessenger operations Parameters diff --git a/fink_broker/science.py b/fink_broker/science.py index 28b9660e..d97f9960 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -57,6 +57,7 @@ # --------------------------------- _LOG = logging.getLogger(__name__) + def dec2theta(dec: float) -> float: """Convert Dec (deg) to theta (rad)""" return np.pi / 2.0 - np.pi / 180.0 * dec @@ -66,6 +67,7 @@ def ra2phi(ra: float) -> float: """Convert RA (deg) to phi (rad)""" return np.pi / 180.0 * ra + @pandas_udf(LongType(), PandasUDFType.SCALAR) def ang2pix(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series: """Compute pixel number at given nside diff --git a/fink_broker/spark_utils.py b/fink_broker/spark_utils.py index e6978554..0df4470b 100644 --- a/fink_broker/spark_utils.py +++ b/fink_broker/spark_utils.py @@ -32,6 +32,7 @@ # --------------------------------- _LOG = logging.getLogger(__name__) + def from_avro(dfcol: Column, jsonformatschema: str) -> Column: """Decode the Avro data contained in a DataFrame column into a struct. From 00d938ffccd2509bf95747e73b28182481da4431 Mon Sep 17 00:00:00 2001 From: Fabrice Jammes Date: Mon, 16 Sep 2024 10:51:43 +0200 Subject: [PATCH 5/5] Add self-hosted noscience pipeline --- ...fhosted.yml => e2e-selfhosted-science.yml} | 0 doc/{self_hosted.md => self_hosted_ci.md} | 31 +++-------- e2e/fink-ci.sh | 52 +++++++++++++++++++ e2e/run.sh | 11 +++- 4 files changed, 68 insertions(+), 26 deletions(-) rename .github/workflows/{e2e-selfhosted.yml => e2e-selfhosted-science.yml} (100%) rename doc/{self_hosted.md => self_hosted_ci.md} (58%) create mode 100644 e2e/fink-ci.sh diff --git a/.github/workflows/e2e-selfhosted.yml b/.github/workflows/e2e-selfhosted-science.yml similarity index 100% rename from .github/workflows/e2e-selfhosted.yml rename to .github/workflows/e2e-selfhosted-science.yml diff --git a/doc/self_hosted.md b/doc/self_hosted_ci.md similarity index 58% rename from doc/self_hosted.md rename to doc/self_hosted_ci.md index 252482ae..29b7d33b 100644 --- a/doc/self_hosted.md +++ b/doc/self_hosted_ci.md @@ -27,28 +27,8 @@ sudo su - fink-ci echo "your_github_token" > /home/fink-ci/.token chmod 600 /home/fink-ci/.token -cat < /home/fink-ci/fink-ci.sh -#!/bin/bash - -set -euxo pipefail - -# Load GitHub token -export TOKEN=$(cat /home/fink-ci/.token) -export USER="fink-ci" -repo_url="https://github.com/astrolabsoftware/fink-broker.git" -tmpdir=\$(mktemp -d --suffix -fink-broker-ci) -repo=\$tmpdir/fink-broker -branchname="877-automated-e2e-tests" - -# Set go path according to go install method -PATH=\$HOME/go/bin:/usr/local/go/bin:/usr/local/bin:\$PATH - -# Clone the repository -git clone --single-branch \$repo_url \$repo --branch \$branchname - -# Run fink ci in science mode -\$repo/e2e/run.sh -s -c -EOF +# Retrieve the fink-ci script +curl https://raw.githubusercontent.com/astrolabsoftware/fink-broker/master/e2e/fink-ci.sh # Make the script executable chmod +x /home/fink-ci/fink-ci.sh @@ -63,13 +43,14 @@ To ensure the CI script runs nightly, set up a cron job. crontab -e ``` -2. Add the following line to schedule the script to run at midnight every day: +2. Add the following line to schedule the `noscience` and `science` scripts to run every night: ```bash - 0 0 * * * /home/fink-ci/fink-ci.sh >> /home/fink-ci/cronjob-$(date +\%Y-\%m-\%d).log 2>&1 + 0 0 * * * /home/fink-ci/fink-ci.sh -c >> /home/fink-ci/cronjob-noscience-$(date +\%Y-\%m-\%d).log 2>&1 + 0 1 * * * /home/fink-ci/fink-ci.sh -c -s >> /home/fink-ci/cronjob-science-$(date +\%Y-\%m-\%d).log 2>&1 ``` - This will log the output to `/home/fink-ci/cronjob-YYY-MM-DD.log`. + This will log the output to `/home/fink-ci/cronjob--YYY-MM-DD.log`. By following this procedure, the `fink-ci` user will be set up to automatically run fink e2e tests every night and report it to Github Actions. \ No newline at end of file diff --git a/e2e/fink-ci.sh b/e2e/fink-ci.sh new file mode 100644 index 00000000..d3080f2a --- /dev/null +++ b/e2e/fink-ci.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +# Clone Fink-broker source code in a tempory directory and run e2e test +# designed to be run as a cron job. + +set -euxo pipefail + +export TOKEN=$(cat /home/fink-ci/.token) +export USER="fink-ci" +repo_url="https://github.com/astrolabsoftware/fink-broker.git" +tmpdir=$(mktemp -d --suffix -fink-broker-ci) +repo=$tmpdir/fink-broker +science_opt="" +cleanup_opt="" + +# Set go path according to go install method +PATH=$HOME/go/bin:/usr/local/go/bin:/usr/local/bin:$PATH + +branchname="master" + +usage() { + cat << EOD +Usage: $(basename "$0") [options] +Available options: + -h This message + -c Cleanup the cluster if the tests are successful + -s Use the science algorithms during the tests + -b Branch name to clone (default: master) + +Clone Fink-broker source code in a tempory directory and run e2e test +designed to be run as a cron job. + +EOD +} + +# Get the options +while getopts hsb: c ; do + case $c in + h) usage ; exit 0 ;; + b) branchname=$OPTARG ;; + c) cleanup_opt="-c" ;; + s) science_opt="-s" ;; + \?) usage ; exit 2 ;; + esac +done +shift "$((OPTIND-1))" + +# Clone the repository +git clone --single-branch $repo_url $repo --branch $branchname + +# Run fink ci in science mode +$repo/e2e/run.sh "$cleanup_opt" $science_opt diff --git a/e2e/run.sh b/e2e/run.sh index 808159e3..c7eb3ee2 100755 --- a/e2e/run.sh +++ b/e2e/run.sh @@ -52,6 +52,15 @@ export SUFFIX function dispatch() { + + if [ $SUFFIX = "" ]; then + echo "Running e2e tests with science algorithms" + event_type="e2e-science" + else + echo "Running e2e tests without science algorithms" + event_type="e2e-noscience" + fi + url="https://api.github.com/repos/astrolabsoftware/fink-broker/dispatches" payload="{\"build\": $build,\"e2e\": $e2e,\"push\": $push, \"cluster\": \"$cluster\", \"image\": \"$CIUX_IMAGE_URL\"}" @@ -67,7 +76,7 @@ function dispatch() -H "Authorization: Bearer $token" \ -H "X-GitHub-Api-Version: 2022-11-28" \ $url \ - -d "{\"event_type\":\"e2e-science\",\"client_payload\":$payload}" || echo "ERROR Failed to dispatch event" >&2 + -d "{\"event_type\":\"$event_type\",\"client_payload\":$payload}" || echo "ERROR Failed to dispatch event" >&2 fi if [ $cleanup = true -a $e2e = true ]; then