Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(udf): add retry on UDF connection error #13375

Merged
merged 7 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pkill python3

sqllogictest -p 4566 -d dev './e2e_test/udf/alter_function.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/graceful_shutdown_python.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/retry_python.slt'

echo "--- e2e, $mode, java udf"
java -jar risingwave-udf-example.jar &
Expand Down
35 changes: 35 additions & 0 deletions e2e_test/udf/retry_python.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
system ok
python3 e2e_test/udf/test.py &

# wait for server to start
sleep 1s

statement ok
CREATE FUNCTION sleep(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815';

# restart the server
system ok
pkill -9 python && python3 e2e_test/udf/test.py &

# query should not be affected
query I
select sleep(0);
----
0

# restart the server after 1s
system ok
sleep 1 && pkill -9 python && python3 e2e_test/udf/test.py &

# query should not be affected
query I
select sleep(2);
----
0

# close the server
system ok
pkill python

statement ok
DROP FUNCTION sleep;
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl UdfExpression {

let output = self
.client
.call(&self.identifier, input)
.call_with_retry(&self.identifier, input)
.instrument_await(self.span.clone())
.await?;
if output.num_rows() != vis.count_ones() {
Expand Down
6 changes: 5 additions & 1 deletion src/udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ futures-util = "0.3.28"
static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "macros"] }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"macros",
] }
tonic = { workspace = true }
tracing = "0.1"

[lints]
workspace = true
17 changes: 17 additions & 0 deletions src/udf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,21 @@ pub enum ErrorInner {
ServiceError(String),
}

impl Error {
/// Returns true if the error is caused by a connection error.
pub fn is_connection_error(&self) -> bool {
match self.inner() {
// stream closed because of a broken pipe
ErrorInner::Flight(FlightError::Tonic(status))
if status.code() == tonic::Code::Unknown =>
{
true
}
// Connection refused
ErrorInner::Tonic(status) if status.code() == tonic::Code::Unavailable => true,
_ => false,
}
}
}

static_assertions::const_assert_eq!(std::mem::size_of::<Error>(), 8);
49 changes: 36 additions & 13 deletions src/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use arrow_array::RecordBatch;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::encode::FlightDataEncoderBuilder;
Expand All @@ -36,13 +38,21 @@ pub struct ArrowFlightUdfClient {
impl ArrowFlightUdfClient {
/// Connect to a UDF service.
pub async fn connect(addr: &str) -> Result<Self> {
let client = FlightServiceClient::connect(addr.to_string()).await?;
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect()
.await?;
let client = FlightServiceClient::new(conn);
Ok(Self { client })
}

/// Connect to a UDF service lazily (i.e. only when the first request is sent).
pub fn connect_lazy(addr: &str) -> Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?.connect_lazy();
let conn = tonic::transport::Endpoint::new(addr.to_string())?
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect_lazy();
let client = FlightServiceClient::new(conn);
Ok(Self { client })
}
Expand Down Expand Up @@ -109,6 +119,22 @@ impl ArrowFlightUdfClient {
}
}

/// Call a function, retry up to 5 times / 3s if connection is broken.
pub async fn call_with_retry(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
for i in 0..5 {
match self.call(id, input.clone()).await {
Err(err) if err.is_connection_error() && i != 4 => {
tracing::error!(%err, "UDF connection error. retry...");
}
ret => return ret,
}
tokio::time::sleep(backoff).await;
backoff *= 2;
}
unreachable!()
stdrc marked this conversation as resolved.
Show resolved Hide resolved
}

/// Call a function with streaming input and output.
#[panic_return = "Result<stream::Empty<_>>"]
pub async fn call_stream(
Expand All @@ -117,17 +143,14 @@ impl ArrowFlightUdfClient {
inputs: impl Stream<Item = RecordBatch> + Send + 'static,
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send + 'static> {
let descriptor = FlightDescriptor::new_path(vec![id.into()]);
let flight_data_stream = FlightDataEncoderBuilder::new()
// XXX(wrj): unlimit the size of flight data to avoid splitting batch
// there's a bug in arrow-flight when splitting batch with list type array
// FIXME: remove this when the bug is fixed in arrow-flight
.with_max_flight_data_size(usize::MAX)
.build(inputs.map(Ok))
.map(move |res| FlightData {
// TODO: fill descriptor only for the first message
flight_descriptor: Some(descriptor.clone()),
..res.unwrap()
});
let flight_data_stream =
FlightDataEncoderBuilder::new()
.build(inputs.map(Ok))
.map(move |res| FlightData {
// TODO: fill descriptor only for the first message
flight_descriptor: Some(descriptor.clone()),
..res.unwrap()
});

// call `do_exchange` on Flight server
let response = self.client.clone().do_exchange(flight_data_stream).await?;
Expand Down
Loading