Skip to content

Commit

Permalink
Format code and implement minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fjammes committed Sep 16, 2024
1 parent cc0f00f commit d25a3c4
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 67 deletions.
11 changes: 7 additions & 4 deletions bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@
import argparse
import logging
import time
import os

from fink_utils.spark import schema_converter
from fink_broker.parser import getargs
from fink_broker.spark_utils import (
init_sparksession,
connect_to_raw_database,
path_exist,
)
from fink_broker.distribution_utils import get_kafka_df
from fink_broker.logging_utils import init_logger
Expand Down Expand Up @@ -150,7 +148,9 @@ def main():
topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf"

if args.noscience:
logger.debug("Do not apply user-defined filter %s in no-science mode", userfilter)
logger.debug(
"Do not apply user-defined filter %s in no-science mode", userfilter
)
df_tmp = df
else:
logger.debug("Apply user-defined filter %s", userfilter)
Expand All @@ -163,7 +163,9 @@ def main():
# avoid non-nullable bug #852
schema = schema_converter.to_avro(df_tmp.schema)

logger.debug("Get the DataFrame for publishing to Kafka (avro serialized): %s", df_tmp)
logger.debug(
"Get the DataFrame for publishing to Kafka (avro serialized): %s", df_tmp
)
df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False)

logger.debug("Ensure that the topic '%s' exist on the Kafka Server", topicname)
Expand All @@ -189,6 +191,7 @@ def main():
else:
logger.debug("Perform multi-messenger operations")
from fink_broker.mm_utils import distribute_launch_fink_mm

time_spent_in_wait, stream_distrib_list = distribute_launch_fink_mm(spark, args)

if args.exit_after is not None:
Expand Down
20 changes: 14 additions & 6 deletions bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from fink_broker.spark_utils import connect_to_raw_database
from fink_broker.partitioning import convert_to_datetime, convert_to_millitime


def main():
parser = argparse.ArgumentParser(description=__doc__)
args = getargs(parser)
Expand Down Expand Up @@ -82,12 +83,11 @@ def main():

# Add library versions
if args.noscience:
logger.debug("Do not import fink_science because --noscience is set")
fsvsn = "no-science"
else:
# Do not import fink_science if --noscience is set
from fink_science import __version__ as fsvsn


df = df.withColumn("fink_broker_version", F.lit(fbvsn)).withColumn(
"fink_science_version", F.lit(fsvsn)
)
Expand All @@ -106,8 +106,9 @@ def main():
if args.noscience:
logger.info("Do not apply science modules")
else:
# Do not import fink_science if --noscience is set
logger.info("Import science modules")
from fink_broker.science import apply_science_modules

logger.info("Apply science modules")
df = apply_science_modules(df)

Expand All @@ -133,11 +134,15 @@ def main():
else:
logger.debug("Perform multi-messenger operations")
from fink_broker.mm_utils import raw2science_launch_fink_mm
time_spent_in_wait, countquery_mm = raw2science_launch_fink_mm(args, scitmpdatapath)

time_spent_in_wait, countquery_mm = raw2science_launch_fink_mm(
args, scitmpdatapath
)

if args.exit_after is not None:
logger.debug("Keep the Streaming running until something or someone ends it!")
logger.debug(
"Keep the Streaming running until something or someone ends it!"
)
# If GCN arrived, wait for the remaining time since the launch of raw2science
remaining_time = args.exit_after - time_spent_in_wait
remaining_time = remaining_time if remaining_time > 0 else 0
Expand All @@ -154,6 +159,7 @@ def main():
logger.fatal("Elasticc data cannot be processed without science modules")
else:
from fink_broker.science import apply_science_modules_elasticc

logger.info("Apply elasticc science modules")
df = apply_science_modules_elasticc(df)

Expand Down Expand Up @@ -185,7 +191,9 @@ def main():
)

if args.exit_after is not None:
logger.debug("Keep the Streaming running until something or someone ends it!")
logger.debug(
"Keep the Streaming running until something or someone ends it!"
)
time.sleep(args.exit_after)
countquery.stop()
else:
Expand Down
30 changes: 15 additions & 15 deletions doc/self_hosted.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ Here's a documented procedure for creating a Linux user account on Ubuntu, addin
## Pre-requisites

- a Linux server with sudo access
- Docker
- git
- Docker 24.0.2+
- git 2.17.1+
- go 1.22.5+
- A Github token with the "Content" permission on the `fink-broker` Github repository.

## Steps
Expand All @@ -26,28 +27,27 @@ sudo su - fink-ci
echo "your_github_token" > /home/fink-ci/.token
chmod 600 /home/fink-ci/.token

cat <<EOF >> /home/fink-ci/fink-ci.sh
cat <<EOF > /home/fink-ci/fink-ci.sh
#!/bin/bash
set -euxo pipefail
# Load GitHub token
TOKEN=$(cat /home/fink-ci/.token)
export TOKEN=$(cat /home/fink-ci/.token)
export USER="fink-ci"
repo_url="https://github.com/astrolabsoftware/fink-broker.git"
tmpdir=\$(mktemp -d --suffix -fink-broker-ci)
repo=\$tmpdir/fink-broker
branchname="877-automated-e2e-tests"
REPO_URL="https://github.com/astrolabsoftware/fink-broker.git"
# Set go path according to go install method
PATH=\$HOME/go/bin:/usr/local/go/bin:/usr/local/bin:\$PATH
REPO=/home/fink-ci/fink-broker
# Clone the repository or pull the latest changes
if [ ! -d "\$REPO" ]; then
git clone \$REPO_URL \$REPO
else
cd \$REPO
git -C \$REPO pull
fi
# Clone the repository
git clone --single-branch \$repo_url \$repo --branch \$branchname
# Run fink ci in science mode
\$REPO/e2e/run.sh -s
\$repo/e2e/run.sh -s -c
EOF

# Make the script executable
Expand Down
2 changes: 1 addition & 1 deletion doc/troubleshoot.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ kubectl run -it --rm s5cmd --image=peakcom/s5cmd --env AWS_ACCESS_KEY_ID=minio -
Interactive access:
```shell
kubectl run -it --rm s5cmd --image=peakcom/s5cmd --env AWS_ACCESS_KEY_ID=minio --env AWS_SECRET_ACCESS_KEY=minio123 --env S3_ENDPOINT_URL=https://minio.minio:443 --command -- sh
/s5cmd --log debug --no-verify-ssl ls -H "s3://fink-broker-online/raw/2020/01/01/"
/s5cmd --log debug --no-verify-ssl ls -H "s3://fink-broker-online/raw/20200101/"
/s5cmd --log debug --no-verify-ssl ls "s3://fink-broker-online/*"
```
80 changes: 41 additions & 39 deletions e2e/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ usage () {

SUFFIX="noscience"

ciux_version=v0.0.4-rc8
export CIUXCONFIG=$HOME/.ciux/ciux.sh

src_dir=$DIR/..
cleanup=false
build=false
e2e=false
push=false

token="${TOKEN:-}"

# Get options for suffix
Expand All @@ -40,29 +49,50 @@ while getopts hcs opt; do
done

export SUFFIX
export CIUXCONFIG=$HOME/.ciux/ciux.sh

cleanup=false
build=false
e2e=false
push=false

function dispatch()
{
url="https://api.github.com/repos/astrolabsoftware/fink-broker/dispatches"

payload="{\"build\": $build,\"e2e\": $e2e,\"push\": $push, \"cluster\": \"$cluster\", \"image\": \"$CIUX_IMAGE_URL\"}"
echo "Payload: $payload"

if [ -z "$token" ]; then
echo "No token provided, skipping GitHub dispatch"
else
echo "Dispatching event to GitHub"
curl -L \
-X POST \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer $token" \
-H "X-GitHub-Api-Version: 2022-11-28" \
$url \
-d "{\"event_type\":\"e2e-science\",\"client_payload\":$payload}" || echo "ERROR Failed to dispatch event" >&2
fi

if [ $cleanup = true -a $e2e = true ]; then
echo "Delete the cluster $cluster"
ktbx delete --name "$cluster"
else
echo "Cluster $cluster kept for debugging"
fi
}

trap dispatch EXIT

ciux_version=v0.0.4-rc8
go install github.com/k8s-school/ciux@"$ciux_version"

echo "Ignite the project using ciux"
mkdir -p ~/.ciux

# Build step
$DIR/../build.sh -s "$SUFFIX"
$src_dir/build.sh -s "$SUFFIX"
build=true

# e2e tests step
ciux ignite --selector itest $PWD --suffix "$SUFFIX"
ciux ignite --selector itest "$src_dir" --suffix "$SUFFIX"

cluster=$(ciux get clustername $DIR/..)
cluster=$(ciux get clustername "$src_dir")
echo "Delete the cluster $cluster if it already exists"
ktbx delete --name "$cluster" || true

Expand All @@ -82,34 +112,6 @@ $DIR/check-results.sh
e2e=true

echo "Push the image to Container Registry"
$DIR/../push-image.sh
$src_dir/push-image.sh
push=true
}

url="https://api.github.com/repos/astrolabsoftware/fink-broker/dispatches"

payload="{\"build\": $build,\"e2e\": $e2e,\"push\": $push, \"cluster\": \"$cluster\", \"image\": \"$CIUX_IMAGE_URL\"}"
echo "Payload: $payload"

if [ -z "$token" ]; then
echo "No token provided, skipping GitHub dispatch"
else
echo "Dispatching event to GitHub"
curl -L \
-X POST \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer $token" \
-H "X-GitHub-Api-Version: 2022-11-28" \
$url \
-d "{\"event_type\":\"e2e-science\",\"client_payload\":$payload}" || echo "ERROR Failed to dispatch event" >&2
fi

if [ $cleanup = true -a $e2e = true ]; then
echo "Delete the cluster $cluster"
ktbx delete --name "$cluster"
else
echo "Cluster $cluster kept for debugging"
fi



2 changes: 1 addition & 1 deletion fink_broker/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def init_logger(log_level: str = "INFO") -> logging.Logger:
logger.addHandler(streamHandler)

# TODO use a configuration file in a configmap to set the log level of the py4j logger
logging.getLogger('py4j').setLevel(logging.INFO)
logging.getLogger("py4j").setLevel(logging.INFO)

return logger

Expand Down
5 changes: 4 additions & 1 deletion fink_broker/mm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]:
_LOG.warning("No configuration found for fink-mm -- not applied")
return 0, []

def raw2science_launch_fink_mm(args: dict, scitmpdatapath: str) -> Tuple[int, StreamingQuery]:

def raw2science_launch_fink_mm(
args: dict, scitmpdatapath: str
) -> Tuple[int, StreamingQuery]:
"""Manage multimessenger operations
Parameters
Expand Down
2 changes: 2 additions & 0 deletions fink_broker/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
# ---------------------------------
_LOG = logging.getLogger(__name__)


def dec2theta(dec: float) -> float:
"""Convert Dec (deg) to theta (rad)"""
return np.pi / 2.0 - np.pi / 180.0 * dec
Expand All @@ -66,6 +67,7 @@ def ra2phi(ra: float) -> float:
"""Convert RA (deg) to phi (rad)"""
return np.pi / 180.0 * ra


@pandas_udf(LongType(), PandasUDFType.SCALAR)
def ang2pix(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series:
"""Compute pixel number at given nside
Expand Down
1 change: 1 addition & 0 deletions fink_broker/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# ---------------------------------
_LOG = logging.getLogger(__name__)


def from_avro(dfcol: Column, jsonformatschema: str) -> Column:
"""Decode the Avro data contained in a DataFrame column into a struct.
Expand Down

0 comments on commit d25a3c4

Please sign in to comment.