From a3b013466a8edef795c5e1b6cb8e90000d965ec9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Ko=C5=82odziejczyk?= Date: Tue, 24 Sep 2024 12:15:48 +0200 Subject: [PATCH] Flink SQL + Nu Batch support (#4) Co-authored-by: MK Software --- .../data/flink/execute-flink-dml-scripts.sh | 45 +++++++ .../data/keep-sending.sh | 1 + .../setup/flink/execute-flink-ddl-scripts.sh | 45 +++++++ .../nu/import-and-deploy-example-scenarios.sh | 2 +- .../setup/run-setup.sh | 1 + .../utils/flink/execute-flink-sql-scripts.sh | 121 ++++++++++++++++++ ...y-scenario-and-wait-for-deployed-state.sh} | 7 +- .../utils/nu/load-scenario-from-json-file.sh | 41 ++++-- version | 2 +- 9 files changed, 250 insertions(+), 15 deletions(-) create mode 100755 scenario-examples-bootstrapper/data/flink/execute-flink-dml-scripts.sh create mode 100755 scenario-examples-bootstrapper/setup/flink/execute-flink-ddl-scripts.sh create mode 100755 scenario-examples-bootstrapper/utils/flink/execute-flink-sql-scripts.sh rename scenario-examples-bootstrapper/utils/nu/{deploy-scenario-and-wait-for-running-state.sh => deploy-scenario-and-wait-for-deployed-state.sh} (89%) diff --git a/scenario-examples-bootstrapper/data/flink/execute-flink-dml-scripts.sh b/scenario-examples-bootstrapper/data/flink/execute-flink-dml-scripts.sh new file mode 100755 index 0000000..0a996a1 --- /dev/null +++ b/scenario-examples-bootstrapper/data/flink/execute-flink-dml-scripts.sh @@ -0,0 +1,45 @@ +#!/bin/bash -e + +cd "$(dirname "$0")" + +source ../../utils/lib.sh + +if [ "$#" -ne 1 ]; then + red_echo "ERROR: One parameter required: 1) scenario example folder path\n" + exit 1 +fi + +function execute_flink_sql_file() { + if [ "$#" -ne 1 ]; then + red_echo "ERROR: One parameter required: 1) Flink SQL file\n" + exit 11 + fi + + set -e + + local FLINK_SQL_FILE_PATH=$1 + + echo "Executing Flink SQL file '$(basename "$FLINK_SQL_FILE_PATH")'... " + ../../utils/flink/execute-flink-sql-scripts.sh "$FLINK_SQL_FILE_PATH" +} + +SCENARIO_EXAMPLE_DIR_PATH=${1%/} + +echo "Starting to execute Flink DML scripts..." + +shopt -s nullglob + +for ITEM in "$SCENARIO_EXAMPLE_DIR_PATH/data/flink/static"/*; do + if [ ! -f "$ITEM" ]; then + continue + fi + + if [[ ! "$ITEM" == *.sql ]]; then + red_echo "ERROR: Unrecognized file $ITEM. Required file with extension '.sql' and content with DML statements\n" + exit 3 + fi + + execute_flink_sql_file "$ITEM" +done + +echo -e "Flink DML scripts executed!\n" diff --git a/scenario-examples-bootstrapper/data/keep-sending.sh b/scenario-examples-bootstrapper/data/keep-sending.sh index 2293b4e..67620e6 100755 --- a/scenario-examples-bootstrapper/data/keep-sending.sh +++ b/scenario-examples-bootstrapper/data/keep-sending.sh @@ -13,6 +13,7 @@ if is_data_generation_active; then if is_scenario_enabled "$FOLDER"; then echo -e "Starting to send static and generated data for scenario from ${GREEN}$FOLDER${RESET} directory...\n\n" + ./flink/execute-flink-dml-scripts.sh "$FOLDER" ./http/send-http-static-requests.sh "$FOLDER" ./kafka/send-kafka-static-messages.sh "$FOLDER" ./http/continuously-send-http-generated-requests.sh "$FOLDER" diff --git a/scenario-examples-bootstrapper/setup/flink/execute-flink-ddl-scripts.sh b/scenario-examples-bootstrapper/setup/flink/execute-flink-ddl-scripts.sh new file mode 100755 index 0000000..937d03b --- /dev/null +++ b/scenario-examples-bootstrapper/setup/flink/execute-flink-ddl-scripts.sh @@ -0,0 +1,45 @@ +#!/bin/bash -e + +cd "$(dirname "$0")" + +source ../../utils/lib.sh + +if [ "$#" -ne 1 ]; then + red_echo "ERROR: One parameter required: 1) scenario example folder path\n" + exit 1 +fi + +function execute_flink_sql_file() { + if [ "$#" -ne 1 ]; then + red_echo "ERROR: One parameter required: 1) Flink SQL file\n" + exit 11 + fi + + set -e + + local FLINK_SQL_FILE_PATH=$1 + + echo "Executing Flink SQL file '$(basename "$FLINK_SQL_FILE_PATH")'... " + ../../utils/flink/execute-flink-sql-scripts.sh "$FLINK_SQL_FILE_PATH" +} + +SCENARIO_EXAMPLE_DIR_PATH=${1%/} + +echo "Starting to run Flink DDL scripts..." + +shopt -s nullglob + +for ITEM in "$SCENARIO_EXAMPLE_DIR_PATH/setup/flink"/*; do + if [ ! -f "$ITEM" ]; then + continue + fi + + if [[ ! "$ITEM" == *.sql ]]; then + red_echo "ERROR: Unrecognized file $ITEM. Required file with extension '.sql'\n" + exit 2 + fi + + execute_flink_sql_file "$ITEM" +done + +echo -e "Flink DDL scripts executed!\n" diff --git a/scenario-examples-bootstrapper/setup/nu/import-and-deploy-example-scenarios.sh b/scenario-examples-bootstrapper/setup/nu/import-and-deploy-example-scenarios.sh index 83dee8c..9bdf19a 100755 --- a/scenario-examples-bootstrapper/setup/nu/import-and-deploy-example-scenarios.sh +++ b/scenario-examples-bootstrapper/setup/nu/import-and-deploy-example-scenarios.sh @@ -23,7 +23,7 @@ function import_and_deploy_scenario() { local EXAMPLE_SCENARIO_FILE=$2 ../../utils/nu/load-scenario-from-json-file.sh "$EXAMPLE_SCENARIO_NAME" "$EXAMPLE_SCENARIO_FILE" - ../../utils/nu/deploy-scenario-and-wait-for-running-state.sh "$EXAMPLE_SCENARIO_NAME" + ../../utils/nu/deploy-scenario-and-wait-for-deployed-state.sh "$EXAMPLE_SCENARIO_NAME" if ! should_deploy_scenario "$SCENARIO_EXAMPLE_DIR_PATH"; then ../../utils/nu/cancel-scenario-and-wait-for-canceled-state.sh "$EXAMPLE_SCENARIO_NAME" diff --git a/scenario-examples-bootstrapper/setup/run-setup.sh b/scenario-examples-bootstrapper/setup/run-setup.sh index 332080c..5088d2a 100755 --- a/scenario-examples-bootstrapper/setup/run-setup.sh +++ b/scenario-examples-bootstrapper/setup/run-setup.sh @@ -14,6 +14,7 @@ for FOLDER in /scenario-examples/*; do ./schema-registry/setup-schemas.sh "$FOLDER" ./kafka/setup-topics.sh "$FOLDER" + ./flink/execute-flink-ddl-scripts.sh "$FOLDER" ./nu/customize-nu-configuration.sh "$FOLDER" ./nu/import-and-deploy-example-scenarios.sh "$FOLDER" diff --git a/scenario-examples-bootstrapper/utils/flink/execute-flink-sql-scripts.sh b/scenario-examples-bootstrapper/utils/flink/execute-flink-sql-scripts.sh new file mode 100755 index 0000000..132a412 --- /dev/null +++ b/scenario-examples-bootstrapper/utils/flink/execute-flink-sql-scripts.sh @@ -0,0 +1,121 @@ +#!/bin/bash -e + +cd "$(dirname "$0")" + +source ../lib.sh + +if [ "$#" -ne 1 ]; then + red_echo "ERROR: One parameter required: 1) Flink SQL script path\n" + exit 1 +fi + +if ! [ -v FLINK_SQL_GATEWAY_ADDRESS ] || [ -z "$FLINK_SQL_GATEWAY_ADDRESS" ]; then + red_echo "ERROR: required variable FLINK_SQL_GATEWAY_ADDRESS not set or empty\n" + exit 2 +fi + +FLINK_SQL_FILE_PATH=$1 + +if [ ! -f "$FLINK_SQL_FILE_PATH" ]; then + red_echo "ERROR: Cannot find Flink SQL script file $FLINK_SQL_FILE_PATH\n" + exit 3 +fi + +HOST='localhost:8083' + +function apiCall() { + if [ "$#" -lt 2 ]; then + red_echo "Error: Two parameters required: 1) HTTP method, 2) endpoint (should start with /) \n" + exit 11 + fi + + set -e + + local METHOD=$1 + local ENDPOINT=$2 + local REQUEST_BODY=$3 + + local RESPONSE + if [[ -n "$REQUEST_BODY" ]]; then + RESPONSE=$(curl -s -L -w "\n%{http_code}" \ + -X "$METHOD" "http://${FLINK_SQL_GATEWAY_ADDRESS}${ENDPOINT}" \ + -H "Accept: application/json" \ + -H "Content-Type: application/json" \ + -d "$REQUEST_BODY" + ) + else + RESPONSE=$(curl -s -L -w "\n%{http_code}" \ + -X "$METHOD" "http://${FLINK_SQL_GATEWAY_ADDRESS}${ENDPOINT}" \ + -H "Accept: application/json" + ) + fi + + local HTTP_STATUS + HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1) + + if [ "$HTTP_STATUS" == "200" ]; then + local RESPONSE_BODY + RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) + echo "$RESPONSE_BODY" + else + local RESPONSE_BODY + RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) + red_echo "ERROR: Call $METHOD $ENDPOINT failed. HTTP status: $HTTP_STATUS, Response: $RESPONSE_BODY" + exit 1 + fi +} + +SESSIONHANDLE=$(apiCall "POST" "/sessions" '{"properties": {"execution.runtime-mode": "batch"}}' | jq -r '.sessionHandle' ) +if [[ -z "$SESSIONHANDLE" ]]; then + red_echo "ERROR: Failed to establish session." + exit 1 +fi +echo "Session $SESSIONHANDLE established." + +SQL_CONTENT=$(<"$FLINK_SQL_FILE_PATH") +IFS=';' + +echo "" +for statement in $SQL_CONTENT; do + + if [[ -n "$statement" ]]; then + SQL_STATEMENT_ONE_LINE_WITHOUT_COMMENTS="$(echo $statement | sed 's/--.*//' | tr '\n' ' ' | tr -s ' ');" + + echo -e "Processing SQL statement:\n$SQL_STATEMENT_ONE_LINE_WITHOUT_COMMENTS" + + OPERATIONHANDLE=$(apiCall "POST" "/sessions/$SESSIONHANDLE/statements" "{\"statement\": \"$SQL_STATEMENT_ONE_LINE_WITHOUT_COMMENTS\"}" | jq -r '.operationHandle') + + while true; do + STATUS=$(apiCall "GET" "/sessions/$SESSIONHANDLE/operations/$OPERATIONHANDLE/status" | jq -r '.status') + echo "Status: $STATUS" + + case $STATUS in + "RUNNING") + sleep 2 + ;; + "FINISHED") + break + ;; + *) + red_echo "ERROR: Unexpected status: $STATUS" + exit 1 + ;; + esac + done + + echo "--------------------------" + fi +done + +unset IFS + +STATUS=$(apiCall "DELETE" "/sessions/$SESSIONHANDLE" | jq -r '.status') +case $STATUS in + "CLOSED") + echo -e "\nSession $SESSIONHANDLE closed." + ;; + *) + red_echo "ERROR: Unexpected status: $STATUS" + exit 1 + ;; +esac diff --git a/scenario-examples-bootstrapper/utils/nu/deploy-scenario-and-wait-for-running-state.sh b/scenario-examples-bootstrapper/utils/nu/deploy-scenario-and-wait-for-deployed-state.sh similarity index 89% rename from scenario-examples-bootstrapper/utils/nu/deploy-scenario-and-wait-for-running-state.sh rename to scenario-examples-bootstrapper/utils/nu/deploy-scenario-and-wait-for-deployed-state.sh index 02713c1..d1353ff 100755 --- a/scenario-examples-bootstrapper/utils/nu/deploy-scenario-and-wait-for-running-state.sh +++ b/scenario-examples-bootstrapper/utils/nu/deploy-scenario-and-wait-for-deployed-state.sh @@ -83,16 +83,17 @@ END_TIME=$((START_TIME + TIMEOUT_SECONDS)) deploy_scenario "$SCENARIO_NAME" +DEPLOYMENT_STATUS="" while true; do DEPLOYMENT_STATUS=$(check_deployment_status "$SCENARIO_NAME") - if [ "$DEPLOYMENT_STATUS" == "RUNNING" ]; then + if [[ "$DEPLOYMENT_STATUS" == "RUNNING" || "$DEPLOYMENT_STATUS" == "FINISHED" ]]; then break fi CURRENT_TIME=$(date +%s) if [ $CURRENT_TIME -gt $END_TIME ]; then - red_echo "ERROR: Timeout for waiting for the RUNNING state of $SCENARIO_NAME deployment reached!\n" + red_echo "ERROR: Timeout for waiting for the RUNNING (or FINISHED) state of $SCENARIO_NAME deployment reached!\n" exit 3 fi @@ -100,4 +101,4 @@ while true; do sleep $WAIT_INTERVAL done -echo "Scenario $SCENARIO_NAME is RUNNING!" +echo "Scenario $SCENARIO_NAME is $DEPLOYMENT_STATUS!" diff --git a/scenario-examples-bootstrapper/utils/nu/load-scenario-from-json-file.sh b/scenario-examples-bootstrapper/utils/nu/load-scenario-from-json-file.sh index 2ba1777..c2c055d 100755 --- a/scenario-examples-bootstrapper/utils/nu/load-scenario-from-json-file.sh +++ b/scenario-examples-bootstrapper/utils/nu/load-scenario-from-json-file.sh @@ -143,23 +143,44 @@ function save_scenario() { echo "Scenario $SCENARIO_NAME saved successfully." } -META_DATA_TYPE=$(jq -r .metaData.additionalFields.metaDataType < "$SCENARIO_FILE_PATH") -case "$META_DATA_TYPE" in - "StreamMetaData") +SCENARIO_FILE_NAME="${SCENARIO_FILE_PATH%.*}" +case "$SCENARIO_FILE_NAME" in + *streaming) + echo "Assuming that scenario in $SCENARIO_FILE_PATH is a Streaming scenario..." ENGINE="Flink" PROCESSING_MODE="Unbounded-Stream" ;; - "LiteStreamMetaData") - ENGINE="Lite Embedded" - PROCESSING_MODE="Unbounded-Stream" - ;; - "RequestResponseMetaData") + *request-response) + echo "Assuming that scenario in $SCENARIO_FILE_PATH is a Request-Response scenario..." ENGINE="Lite Embedded" PROCESSING_MODE="Request-Response" ;; + *batch) + echo "Assuming that scenario in $SCENARIO_FILE_PATH is a Batch scenario..." + ENGINE="Flink" + PROCESSING_MODE="Bounded-Stream" + ;; *) - red_echo "ERROR: Cannot import scenario with metadata type: $META_DATA_TYPE\n" - exit 4 + echo "Cannot distinguish processing mode based on scenario filename. Using metadata..." + META_DATA_TYPE=$(jq -r .metaData.additionalFields.metaDataType < "$SCENARIO_FILE_PATH") + case "$META_DATA_TYPE" in + "StreamMetaData") + ENGINE="Flink" + PROCESSING_MODE="Unbounded-Stream" + ;; + "LiteStreamMetaData") + ENGINE="Lite Embedded" + PROCESSING_MODE="Unbounded-Stream" + ;; + "RequestResponseMetaData") + ENGINE="Lite Embedded" + PROCESSING_MODE="Request-Response" + ;; + *) + red_echo "ERROR: Cannot import scenario with metadata type: $META_DATA_TYPE\n" + exit 4 + ;; + esac ;; esac diff --git a/version b/version index 868d087..5ebd6ce 100644 --- a/version +++ b/version @@ -1 +1 @@ -LIBRARY_DOCKER_IMAGE_VERSION=0.3.0 \ No newline at end of file +LIBRARY_DOCKER_IMAGE_VERSION=0.4.0