Skip to content

Commit

Permalink
Flink SQL + Nu Batch support (#4)
Browse files Browse the repository at this point in the history
Co-authored-by: MK Software <[email protected]>
  • Loading branch information
coutoPL and mk-software-pl authored Sep 24, 2024
1 parent 36a8d2a commit a3b0134
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash -e

cd "$(dirname "$0")"

source ../../utils/lib.sh

if [ "$#" -ne 1 ]; then
red_echo "ERROR: One parameter required: 1) scenario example folder path\n"
exit 1
fi

function execute_flink_sql_file() {
if [ "$#" -ne 1 ]; then
red_echo "ERROR: One parameter required: 1) Flink SQL file\n"
exit 11
fi

set -e

local FLINK_SQL_FILE_PATH=$1

echo "Executing Flink SQL file '$(basename "$FLINK_SQL_FILE_PATH")'... "
../../utils/flink/execute-flink-sql-scripts.sh "$FLINK_SQL_FILE_PATH"
}

SCENARIO_EXAMPLE_DIR_PATH=${1%/}

echo "Starting to execute Flink DML scripts..."

shopt -s nullglob

for ITEM in "$SCENARIO_EXAMPLE_DIR_PATH/data/flink/static"/*; do
if [ ! -f "$ITEM" ]; then
continue
fi

if [[ ! "$ITEM" == *.sql ]]; then
red_echo "ERROR: Unrecognized file $ITEM. Required file with extension '.sql' and content with DML statements\n"
exit 3
fi

execute_flink_sql_file "$ITEM"
done

echo -e "Flink DML scripts executed!\n"
1 change: 1 addition & 0 deletions scenario-examples-bootstrapper/data/keep-sending.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ if is_data_generation_active; then
if is_scenario_enabled "$FOLDER"; then
echo -e "Starting to send static and generated data for scenario from ${GREEN}$FOLDER${RESET} directory...\n\n"

./flink/execute-flink-dml-scripts.sh "$FOLDER"
./http/send-http-static-requests.sh "$FOLDER"
./kafka/send-kafka-static-messages.sh "$FOLDER"
./http/continuously-send-http-generated-requests.sh "$FOLDER"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash -e

cd "$(dirname "$0")"

source ../../utils/lib.sh

if [ "$#" -ne 1 ]; then
red_echo "ERROR: One parameter required: 1) scenario example folder path\n"
exit 1
fi

function execute_flink_sql_file() {
if [ "$#" -ne 1 ]; then
red_echo "ERROR: One parameter required: 1) Flink SQL file\n"
exit 11
fi

set -e

local FLINK_SQL_FILE_PATH=$1

echo "Executing Flink SQL file '$(basename "$FLINK_SQL_FILE_PATH")'... "
../../utils/flink/execute-flink-sql-scripts.sh "$FLINK_SQL_FILE_PATH"
}

SCENARIO_EXAMPLE_DIR_PATH=${1%/}

echo "Starting to run Flink DDL scripts..."

shopt -s nullglob

for ITEM in "$SCENARIO_EXAMPLE_DIR_PATH/setup/flink"/*; do
if [ ! -f "$ITEM" ]; then
continue
fi

if [[ ! "$ITEM" == *.sql ]]; then
red_echo "ERROR: Unrecognized file $ITEM. Required file with extension '.sql'\n"
exit 2
fi

execute_flink_sql_file "$ITEM"
done

echo -e "Flink DDL scripts executed!\n"
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function import_and_deploy_scenario() {
local EXAMPLE_SCENARIO_FILE=$2

../../utils/nu/load-scenario-from-json-file.sh "$EXAMPLE_SCENARIO_NAME" "$EXAMPLE_SCENARIO_FILE"
../../utils/nu/deploy-scenario-and-wait-for-running-state.sh "$EXAMPLE_SCENARIO_NAME"
../../utils/nu/deploy-scenario-and-wait-for-deployed-state.sh "$EXAMPLE_SCENARIO_NAME"

if ! should_deploy_scenario "$SCENARIO_EXAMPLE_DIR_PATH"; then
../../utils/nu/cancel-scenario-and-wait-for-canceled-state.sh "$EXAMPLE_SCENARIO_NAME"
Expand Down
1 change: 1 addition & 0 deletions scenario-examples-bootstrapper/setup/run-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ for FOLDER in /scenario-examples/*; do

./schema-registry/setup-schemas.sh "$FOLDER"
./kafka/setup-topics.sh "$FOLDER"
./flink/execute-flink-ddl-scripts.sh "$FOLDER"
./nu/customize-nu-configuration.sh "$FOLDER"
./nu/import-and-deploy-example-scenarios.sh "$FOLDER"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/bin/bash -e

cd "$(dirname "$0")"

source ../lib.sh

if [ "$#" -ne 1 ]; then
red_echo "ERROR: One parameter required: 1) Flink SQL script path\n"
exit 1
fi

if ! [ -v FLINK_SQL_GATEWAY_ADDRESS ] || [ -z "$FLINK_SQL_GATEWAY_ADDRESS" ]; then
red_echo "ERROR: required variable FLINK_SQL_GATEWAY_ADDRESS not set or empty\n"
exit 2
fi

FLINK_SQL_FILE_PATH=$1

if [ ! -f "$FLINK_SQL_FILE_PATH" ]; then
red_echo "ERROR: Cannot find Flink SQL script file $FLINK_SQL_FILE_PATH\n"
exit 3
fi

HOST='localhost:8083'

function apiCall() {
if [ "$#" -lt 2 ]; then
red_echo "Error: Two parameters required: 1) HTTP method, 2) endpoint (should start with /) \n"
exit 11
fi

set -e

local METHOD=$1
local ENDPOINT=$2
local REQUEST_BODY=$3

local RESPONSE
if [[ -n "$REQUEST_BODY" ]]; then
RESPONSE=$(curl -s -L -w "\n%{http_code}" \
-X "$METHOD" "http://${FLINK_SQL_GATEWAY_ADDRESS}${ENDPOINT}" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-d "$REQUEST_BODY"
)
else
RESPONSE=$(curl -s -L -w "\n%{http_code}" \
-X "$METHOD" "http://${FLINK_SQL_GATEWAY_ADDRESS}${ENDPOINT}" \
-H "Accept: application/json"
)
fi

local HTTP_STATUS
HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1)

if [ "$HTTP_STATUS" == "200" ]; then
local RESPONSE_BODY
RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
echo "$RESPONSE_BODY"
else
local RESPONSE_BODY
RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
red_echo "ERROR: Call $METHOD $ENDPOINT failed. HTTP status: $HTTP_STATUS, Response: $RESPONSE_BODY"
exit 1
fi
}

SESSIONHANDLE=$(apiCall "POST" "/sessions" '{"properties": {"execution.runtime-mode": "batch"}}' | jq -r '.sessionHandle' )
if [[ -z "$SESSIONHANDLE" ]]; then
red_echo "ERROR: Failed to establish session."
exit 1
fi
echo "Session $SESSIONHANDLE established."

SQL_CONTENT=$(<"$FLINK_SQL_FILE_PATH")
IFS=';'

echo ""
for statement in $SQL_CONTENT; do

if [[ -n "$statement" ]]; then
SQL_STATEMENT_ONE_LINE_WITHOUT_COMMENTS="$(echo $statement | sed 's/--.*//' | tr '\n' ' ' | tr -s ' ');"

echo -e "Processing SQL statement:\n$SQL_STATEMENT_ONE_LINE_WITHOUT_COMMENTS"

OPERATIONHANDLE=$(apiCall "POST" "/sessions/$SESSIONHANDLE/statements" "{\"statement\": \"$SQL_STATEMENT_ONE_LINE_WITHOUT_COMMENTS\"}" | jq -r '.operationHandle')

while true; do
STATUS=$(apiCall "GET" "/sessions/$SESSIONHANDLE/operations/$OPERATIONHANDLE/status" | jq -r '.status')
echo "Status: $STATUS"

case $STATUS in
"RUNNING")
sleep 2
;;
"FINISHED")
break
;;
*)
red_echo "ERROR: Unexpected status: $STATUS"
exit 1
;;
esac
done

echo "--------------------------"
fi
done

unset IFS

STATUS=$(apiCall "DELETE" "/sessions/$SESSIONHANDLE" | jq -r '.status')
case $STATUS in
"CLOSED")
echo -e "\nSession $SESSIONHANDLE closed."
;;
*)
red_echo "ERROR: Unexpected status: $STATUS"
exit 1
;;
esac
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,22 @@ END_TIME=$((START_TIME + TIMEOUT_SECONDS))

deploy_scenario "$SCENARIO_NAME"

DEPLOYMENT_STATUS=""
while true; do
DEPLOYMENT_STATUS=$(check_deployment_status "$SCENARIO_NAME")

if [ "$DEPLOYMENT_STATUS" == "RUNNING" ]; then
if [[ "$DEPLOYMENT_STATUS" == "RUNNING" || "$DEPLOYMENT_STATUS" == "FINISHED" ]]; then
break
fi

CURRENT_TIME=$(date +%s)
if [ $CURRENT_TIME -gt $END_TIME ]; then
red_echo "ERROR: Timeout for waiting for the RUNNING state of $SCENARIO_NAME deployment reached!\n"
red_echo "ERROR: Timeout for waiting for the RUNNING (or FINISHED) state of $SCENARIO_NAME deployment reached!\n"
exit 3
fi

echo "$SCENARIO_NAME deployment state is $DEPLOYMENT_STATUS. Checking again in $WAIT_INTERVAL seconds..."
sleep $WAIT_INTERVAL
done

echo "Scenario $SCENARIO_NAME is RUNNING!"
echo "Scenario $SCENARIO_NAME is $DEPLOYMENT_STATUS!"
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,44 @@ function save_scenario() {
echo "Scenario $SCENARIO_NAME saved successfully."
}

META_DATA_TYPE=$(jq -r .metaData.additionalFields.metaDataType < "$SCENARIO_FILE_PATH")
case "$META_DATA_TYPE" in
"StreamMetaData")
SCENARIO_FILE_NAME="${SCENARIO_FILE_PATH%.*}"
case "$SCENARIO_FILE_NAME" in
*streaming)
echo "Assuming that scenario in $SCENARIO_FILE_PATH is a Streaming scenario..."
ENGINE="Flink"
PROCESSING_MODE="Unbounded-Stream"
;;
"LiteStreamMetaData")
ENGINE="Lite Embedded"
PROCESSING_MODE="Unbounded-Stream"
;;
"RequestResponseMetaData")
*request-response)
echo "Assuming that scenario in $SCENARIO_FILE_PATH is a Request-Response scenario..."
ENGINE="Lite Embedded"
PROCESSING_MODE="Request-Response"
;;
*batch)
echo "Assuming that scenario in $SCENARIO_FILE_PATH is a Batch scenario..."
ENGINE="Flink"
PROCESSING_MODE="Bounded-Stream"
;;
*)
red_echo "ERROR: Cannot import scenario with metadata type: $META_DATA_TYPE\n"
exit 4
echo "Cannot distinguish processing mode based on scenario filename. Using metadata..."
META_DATA_TYPE=$(jq -r .metaData.additionalFields.metaDataType < "$SCENARIO_FILE_PATH")
case "$META_DATA_TYPE" in
"StreamMetaData")
ENGINE="Flink"
PROCESSING_MODE="Unbounded-Stream"
;;
"LiteStreamMetaData")
ENGINE="Lite Embedded"
PROCESSING_MODE="Unbounded-Stream"
;;
"RequestResponseMetaData")
ENGINE="Lite Embedded"
PROCESSING_MODE="Request-Response"
;;
*)
red_echo "ERROR: Cannot import scenario with metadata type: $META_DATA_TYPE\n"
exit 4
;;
esac
;;
esac

Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
LIBRARY_DOCKER_IMAGE_VERSION=0.3.0
LIBRARY_DOCKER_IMAGE_VERSION=0.4.0

0 comments on commit a3b0134

Please sign in to comment.