From 73933ac416a11004afdb2d45fb26e76d63e57321 Mon Sep 17 00:00:00 2001 From: Ed Elliott Date: Wed, 27 Nov 2024 19:43:15 +0000 Subject: [PATCH] On first request set our own operationId so reattachexecute works with spark 3.5.3 --- docs/options.md | 14 ++++- .../Grpc/RequestExecutor.cs | 63 ++++++++++++++----- .../Sql/SparkSessionBuilder.cs | 11 ++++ .../RequestExecutorTests.cs | 20 ++++++ 4 files changed, 90 insertions(+), 18 deletions(-) create mode 100644 src/test/Spark.Connect.Dotnet.Tests/RequestExecutorTests/RequestExecutorTests.cs diff --git a/docs/options.md b/docs/options.md index 1cee90b..3f1c8c8 100644 --- a/docs/options.md +++ b/docs/options.md @@ -65,4 +65,16 @@ To output metrics as responses are reveived enable: ```csharp spark.Conf.Set("spark.connect.dotnet.showmetrics", "true"); -``` \ No newline at end of file +``` + +## Request Wait Timeout + +When we send a request to the server, we need to kill the connection and retry after X seconds because if we have an idle connection then Azure Databricks will kill the tcp connection. To handle this we send a request, wait x seconds and if we haven't completed the response we re-connect to the running query and keep re-connecting every x seconds. + +To change how long we wait before killing the connection and re-connecting you can set: + +```csharp +spark.Conf.Set("spark.connect.dotnet.requestretrytimelimit", "30"); +``` + +The setting is the amount of seconds, it defaults to 45. It isn't really needed for Spark 4 as it has a sort of keep alive where it sends a response even while the server is busy doing something. \ No newline at end of file diff --git a/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/RequestExecutor.cs b/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/RequestExecutor.cs index 21d6c1b..ff7384c 100644 --- a/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/RequestExecutor.cs +++ b/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/RequestExecutor.cs @@ -24,7 +24,7 @@ public class RequestExecutor : IDisposable private readonly SparkSession _session; private readonly Plan _plan; private readonly GrpcLogger _logger; - + private string _operationId = string.Empty; private string _lastResponseId = string.Empty; private bool _isComplete = false; @@ -84,21 +84,22 @@ public async Task ExecAsync() while (shouldContinue && !_isComplete) { shouldContinue = await ProcessRequest(); - _logger.Log(GrpcLoggingLevel.Verbose, $" Processed Request, continue?: {shouldContinue}"); + _logger.Log(GrpcLoggingLevel.Verbose, "Processed Request, continue?: {0} {1} {2} {3}", shouldContinue, _session.SessionId, _operationId, _lastResponseId); } } private CancellationToken GetScheduledCancellationToken() { + var cancelTime = int.Parse(_session.Conf.GetOrDefault(SparkDotnetKnownConfigKeys.RequestExecutorCancelTimeout, "45")); _currentCancellationSource = new CancellationTokenSource(); - _currentCancellationSource.CancelAfter(TimeSpan.FromMinutes(1)); + _currentCancellationSource.CancelAfter(TimeSpan.FromSeconds(cancelTime)); var token = _currentCancellationSource.Token; return token; } private async Task ProcessRequest() { - _logger.Log(GrpcLoggingLevel.Verbose, $" Processing Request"); + _logger.Log(GrpcLoggingLevel.Verbose, "Processing Request {0} {1} {2}", _session.SessionId, _operationId, _lastResponseId); try { @@ -254,35 +255,63 @@ private AsyncServerStreamingCall GetResponse() if (_operationId == string.Empty) { var request = CreateRequest(); - _logger.Log(GrpcLoggingLevel.Verbose, "Calling Execute Plan on session {0}", _session.SessionId); + _logger.Log(GrpcLoggingLevel.Verbose, "Calling Execute Plan on session {0} with operation id {1}", _session.SessionId, _operationId); return _session.GrpcClient.ExecutePlan(request, _session.Headers, null, GetScheduledCancellationToken()); } else { var request = CreateReattachRequest(); - _logger.Log(GrpcLoggingLevel.Verbose, "Calling ReattachExecute Plan on session {0}", _session.SessionId); + _logger.Log(GrpcLoggingLevel.Verbose, "Calling ReattachExecute Plan on session {0} with operation id {1}", _session.SessionId, _operationId); return _session.GrpcClient.ReattachExecute(request, _session.Headers, null, GetScheduledCancellationToken()); } } - - private ExecutePlanRequest CreateRequest() => new() + + private ExecutePlanRequest CreateRequest() { - Plan = _plan, ClientType = _session.ClientType, SessionId = _session.SessionId, UserContext = _session.UserContext, RequestOptions = + _operationId = Guid.NewGuid().ToString(); + + return new() { - new ExecutePlanRequest.Types.RequestOption() + OperationId = _operationId, + Plan = _plan, + ClientType = _session.ClientType, + SessionId = _session.SessionId, + UserContext = _session.UserContext, + RequestOptions = { - ReattachOptions = new ReattachOptions() + new ExecutePlanRequest.Types.RequestOption() { - Reattachable = true + ReattachOptions = new ReattachOptions() + { + Reattachable = true + } } } - } - }; + }; + } - private ReattachExecuteRequest CreateReattachRequest() => new() + private ReattachExecuteRequest CreateReattachRequest() { - ClientType = _session.ClientType, SessionId = _session.SessionId, UserContext = _session.UserContext, OperationId = _operationId, LastResponseId = _lastResponseId - }; + if (_lastResponseId == string.Empty) + { + return new() + { + ClientType = _session.ClientType, + SessionId = _session.SessionId, + UserContext = _session.UserContext, + OperationId = _operationId + }; + } + + return new() + { + ClientType = _session.ClientType, + SessionId = _session.SessionId, + UserContext = _session.UserContext, + OperationId = _operationId, + LastResponseId = _lastResponseId + }; + } private ReleaseExecuteRequest CreateReleaseRequest() => new() { diff --git a/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Sql/SparkSessionBuilder.cs b/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Sql/SparkSessionBuilder.cs index 4ae3825..77798fd 100644 --- a/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Sql/SparkSessionBuilder.cs +++ b/src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Sql/SparkSessionBuilder.cs @@ -185,6 +185,7 @@ public static class SparkDotnetKnownConfigKeys public const string GrpcLogging = RuntimeConf.SparkDotnetConfigKey + "grpclogging"; public const string PrintMetrics = RuntimeConf.SparkDotnetConfigKey + "showmetrics"; public const string DontDecodeArrow = RuntimeConf.SparkDotnetConfigKey + "dontdecodearrow"; + public const string RequestExecutorCancelTimeout = RuntimeConf.SparkDotnetConfigKey + "requestretrytimelimit"; } public class RuntimeConf @@ -240,6 +241,16 @@ public void Unset(string key) task.Wait(); } + public string GetOrDefault(string key, string defaultValue) + { + var value = Get(key); + if (string.IsNullOrEmpty(value)) + { + return defaultValue; + } + + return value; + } public string Get(string key) { if (key.ToLowerInvariant().StartsWith(SparkDotnetConfigKey)) diff --git a/src/test/Spark.Connect.Dotnet.Tests/RequestExecutorTests/RequestExecutorTests.cs b/src/test/Spark.Connect.Dotnet.Tests/RequestExecutorTests/RequestExecutorTests.cs new file mode 100644 index 0000000..55aa677 --- /dev/null +++ b/src/test/Spark.Connect.Dotnet.Tests/RequestExecutorTests/RequestExecutorTests.cs @@ -0,0 +1,20 @@ +using Spark.Connect.Dotnet.Sql; +using Xunit.Abstractions; +using static Spark.Connect.Dotnet.Sql.Functions; + +namespace Spark.Connect.Dotnet.Tests.RequestExecutorTests; + +public class RequestExecutorTests : E2ETestBase +{ + + public RequestExecutorTests(ITestOutputHelper logger) : base(logger) + { + } + + [Fact(Skip = "Pauses for 2 mins - run when testing RequestExecutor")] + public void Initial_Request_Takes_Longer_Than_Timeout() + { + Spark.Conf.Set(SparkDotnetKnownConfigKeys.GrpcLogging, "console"); + Spark.Range(1).Select(Reflect(Lit("java.lang.Thread"), Lit("sleep"), Lit((long)160000))).Show(); + } +} \ No newline at end of file