Skip to content

Commit

Permalink
fix(udf): add graceful shutdown for python UDF server (#13285)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Nov 8, 2023
1 parent 8fae5b5 commit 61c3e2c
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 5 deletions.
6 changes: 4 additions & 2 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile

echo "--- e2e, $mode, python udf"
python3 e2e_test/udf/test.py &
sleep 2
sleep 1
sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt'
pkill python3

sqllogictest -p 4566 -d dev './e2e_test/udf/graceful_shutdown_python.slt'

echo "--- e2e, $mode, java udf"
java -jar risingwave-udf-example.jar &
sleep 2
sleep 1
sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt'
pkill java

Expand Down
36 changes: 36 additions & 0 deletions e2e_test/udf/graceful_shutdown_python.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
system ok
python3 e2e_test/udf/test.py &

# wait for server to start
sleep 1s

statement ok
CREATE FUNCTION sleep(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815';

system ok
sleep 1 && pkill python &

# python server should not exit until the query is finished
query I
select sleep(2);
----
0

# wait for server to exit
sleep 1s

system ok
python3 e2e_test/udf/test.py &

# wait for server to start
sleep 1s

# force kill python server
system ok
sleep 1 && pkill -9 python &

query error
select sleep(2);

statement ok
DROP FUNCTION sleep;
10 changes: 9 additions & 1 deletion e2e_test/udf/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import socket
import struct
import sys
import time
from typing import Iterator, List, Optional, Tuple, Any
from decimal import Decimal

Expand All @@ -28,6 +29,12 @@ def int_42() -> int:
return 42


@udf(input_types=["INT"], result_type="INT")
def sleep(s: int) -> int:
time.sleep(s)
return 0


@udf(input_types=["INT", "INT"], result_type="INT")
def gcd(x: int, y: int) -> int:
while y != 0:
Expand Down Expand Up @@ -190,8 +197,9 @@ def return_all_arrays(


if __name__ == "__main__":
server = UdfServer(location="0.0.0.0:8815")
server = UdfServer(location="localhost:8815")
server.add_function(int_42)
server.add_function(sleep)
server.add_function(gcd)
server.add_function(gcd3)
server.add_function(series)
Expand Down
12 changes: 12 additions & 0 deletions java/udf-example/src/main/java/com/example/UdfExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class UdfExample {
public static void main(String[] args) throws IOException {
try (var server = new UdfServer("0.0.0.0", 8815)) {
server.addFunction("int_42", new Int42());
server.addFunction("sleep", new Sleep());
server.addFunction("gcd", new Gcd());
server.addFunction("gcd3", new Gcd3());
server.addFunction("extract_tcp_info", new ExtractTcpInfo());
Expand Down Expand Up @@ -62,6 +63,17 @@ public int eval() {
}
}

public static class Sleep implements ScalarFunction {
public int eval(int x) {
try {
Thread.sleep(x * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
}
}

public static class Gcd implements ScalarFunction {
public int eval(int a, int b) {
while (b != 0) {
Expand Down
14 changes: 14 additions & 0 deletions src/udf/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.0.11] - 2023-11-06

### Fixed

- Hook SIGTERM to stop the UDF server gracefully.
2 changes: 1 addition & 1 deletion src/udf/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "risingwave"
version = "0.0.10"
version = "0.0.11"
authors = [{ name = "RisingWave Labs" }]
description = "RisingWave Python API"
readme = "README.md"
Expand Down
8 changes: 7 additions & 1 deletion src/udf/python/risingwave/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import json
from concurrent.futures import ThreadPoolExecutor
import concurrent
import signal


class UserDefinedFunction:
Expand Down Expand Up @@ -392,11 +393,16 @@ def do_exchange(self, context, descriptor, reader, writer):
raise e

def serve(self):
"""Start the server."""
"""
Block until the server shuts down.
This method only returns if shutdown() is called or a signal (SIGINT, SIGTERM) received.
"""
print(
"Note: You can use arbitrary function names and struct field names in CREATE FUNCTION statements."
f"\n\nlistening on {self._location}"
)
signal.signal(signal.SIGTERM, lambda s, f: self.shutdown())
super(UdfServer, self).serve()


Expand Down

0 comments on commit 61c3e2c

Please sign in to comment.