From 61c3e2ca1954ced2e6278c0f1de3b8634f039023 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 8 Nov 2023 22:38:08 +0800 Subject: [PATCH] fix(udf): add graceful shutdown for python UDF server (#13285) Signed-off-by: Runji Wang --- ci/scripts/run-e2e-test.sh | 6 ++-- e2e_test/udf/graceful_shutdown_python.slt | 36 +++++++++++++++++++ e2e_test/udf/test.py | 10 +++++- .../src/main/java/com/example/UdfExample.java | 12 +++++++ src/udf/python/CHANGELOG.md | 14 ++++++++ src/udf/python/pyproject.toml | 2 +- src/udf/python/risingwave/udf.py | 8 ++++- 7 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 e2e_test/udf/graceful_shutdown_python.slt create mode 100644 src/udf/python/CHANGELOG.md diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 8fb29ec5bd1a2..d6d26e0236ad0 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -82,13 +82,15 @@ sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile echo "--- e2e, $mode, python udf" python3 e2e_test/udf/test.py & -sleep 2 +sleep 1 sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt' pkill python3 +sqllogictest -p 4566 -d dev './e2e_test/udf/graceful_shutdown_python.slt' + echo "--- e2e, $mode, java udf" java -jar risingwave-udf-example.jar & -sleep 2 +sleep 1 sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt' pkill java diff --git a/e2e_test/udf/graceful_shutdown_python.slt b/e2e_test/udf/graceful_shutdown_python.slt new file mode 100644 index 0000000000000..139a3fd284981 --- /dev/null +++ b/e2e_test/udf/graceful_shutdown_python.slt @@ -0,0 +1,36 @@ +system ok +python3 e2e_test/udf/test.py & + +# wait for server to start +sleep 1s + +statement ok +CREATE FUNCTION sleep(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815'; + +system ok +sleep 1 && pkill python & + +# python server should not exit until the query is finished +query I +select sleep(2); +---- +0 + +# wait for server to exit +sleep 1s + +system ok +python3 e2e_test/udf/test.py & + +# wait for server to start +sleep 1s + +# force kill python server +system ok +sleep 1 && pkill -9 python & + +query error +select sleep(2); + +statement ok +DROP FUNCTION sleep; diff --git a/e2e_test/udf/test.py b/e2e_test/udf/test.py index c34c65e3c232c..45db54a8113b3 100644 --- a/e2e_test/udf/test.py +++ b/e2e_test/udf/test.py @@ -15,6 +15,7 @@ import socket import struct import sys +import time from typing import Iterator, List, Optional, Tuple, Any from decimal import Decimal @@ -28,6 +29,12 @@ def int_42() -> int: return 42 +@udf(input_types=["INT"], result_type="INT") +def sleep(s: int) -> int: + time.sleep(s) + return 0 + + @udf(input_types=["INT", "INT"], result_type="INT") def gcd(x: int, y: int) -> int: while y != 0: @@ -190,8 +197,9 @@ def return_all_arrays( if __name__ == "__main__": - server = UdfServer(location="0.0.0.0:8815") + server = UdfServer(location="localhost:8815") server.add_function(int_42) + server.add_function(sleep) server.add_function(gcd) server.add_function(gcd3) server.add_function(series) diff --git a/java/udf-example/src/main/java/com/example/UdfExample.java b/java/udf-example/src/main/java/com/example/UdfExample.java index eed88d3dda281..3f07f457fd81b 100644 --- a/java/udf-example/src/main/java/com/example/UdfExample.java +++ b/java/udf-example/src/main/java/com/example/UdfExample.java @@ -35,6 +35,7 @@ public class UdfExample { public static void main(String[] args) throws IOException { try (var server = new UdfServer("0.0.0.0", 8815)) { server.addFunction("int_42", new Int42()); + server.addFunction("sleep", new Sleep()); server.addFunction("gcd", new Gcd()); server.addFunction("gcd3", new Gcd3()); server.addFunction("extract_tcp_info", new ExtractTcpInfo()); @@ -62,6 +63,17 @@ public int eval() { } } + public static class Sleep implements ScalarFunction { + public int eval(int x) { + try { + Thread.sleep(x * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return 0; + } + } + public static class Gcd implements ScalarFunction { public int eval(int a, int b) { while (b != 0) { diff --git a/src/udf/python/CHANGELOG.md b/src/udf/python/CHANGELOG.md new file mode 100644 index 0000000000000..3c788857a395b --- /dev/null +++ b/src/udf/python/CHANGELOG.md @@ -0,0 +1,14 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [0.0.11] - 2023-11-06 + +### Fixed + +- Hook SIGTERM to stop the UDF server gracefully. diff --git a/src/udf/python/pyproject.toml b/src/udf/python/pyproject.toml index d3093156eff56..de9b245175f9a 100644 --- a/src/udf/python/pyproject.toml +++ b/src/udf/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "risingwave" -version = "0.0.10" +version = "0.0.11" authors = [{ name = "RisingWave Labs" }] description = "RisingWave Python API" readme = "README.md" diff --git a/src/udf/python/risingwave/udf.py b/src/udf/python/risingwave/udf.py index 758421b1db68d..ea7476a756d6c 100644 --- a/src/udf/python/risingwave/udf.py +++ b/src/udf/python/risingwave/udf.py @@ -21,6 +21,7 @@ import json from concurrent.futures import ThreadPoolExecutor import concurrent +import signal class UserDefinedFunction: @@ -392,11 +393,16 @@ def do_exchange(self, context, descriptor, reader, writer): raise e def serve(self): - """Start the server.""" + """ + Block until the server shuts down. + + This method only returns if shutdown() is called or a signal (SIGINT, SIGTERM) received. + """ print( "Note: You can use arbitrary function names and struct field names in CREATE FUNCTION statements." f"\n\nlistening on {self._location}" ) + signal.signal(signal.SIGTERM, lambda s, f: self.shutdown()) super(UdfServer, self).serve()