Skip to content

Commit

Permalink
On first request set our own operationId so reattachexecute works wit…
Browse files Browse the repository at this point in the history
…h spark 3.5.3 (#42)
  • Loading branch information
GoEddie authored Nov 27, 2024
1 parent 488199d commit 380f439
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 18 deletions.
14 changes: 13 additions & 1 deletion docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,16 @@ To output metrics as responses are reveived enable:

```csharp
spark.Conf.Set("spark.connect.dotnet.showmetrics", "true");
```
```

## Request Wait Timeout

When we send a request to the server, we need to kill the connection and retry after X seconds because if we have an idle connection then Azure Databricks will kill the tcp connection. To handle this we send a request, wait x seconds and if we haven't completed the response we re-connect to the running query and keep re-connecting every x seconds.

To change how long we wait before killing the connection and re-connecting you can set:

```csharp
spark.Conf.Set("spark.connect.dotnet.requestretrytimelimit", "30");
```

The setting is the amount of seconds, it defaults to 45. It isn't really needed for Spark 4 as it has a sort of keep alive where it sends a response even while the server is busy doing something.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RequestExecutor : IDisposable
private readonly SparkSession _session;
private readonly Plan _plan;
private readonly GrpcLogger _logger;

private string _operationId = string.Empty;
private string _lastResponseId = string.Empty;
private bool _isComplete = false;
Expand Down Expand Up @@ -84,21 +84,22 @@ public async Task ExecAsync()
while (shouldContinue && !_isComplete)
{
shouldContinue = await ProcessRequest();
_logger.Log(GrpcLoggingLevel.Verbose, $" Processed Request, continue?: {shouldContinue}");
_logger.Log(GrpcLoggingLevel.Verbose, "Processed Request, continue?: {0} {1} {2} {3}", shouldContinue, _session.SessionId, _operationId, _lastResponseId);
}
}

private CancellationToken GetScheduledCancellationToken()
{
var cancelTime = int.Parse(_session.Conf.GetOrDefault(SparkDotnetKnownConfigKeys.RequestExecutorCancelTimeout, "45"));
_currentCancellationSource = new CancellationTokenSource();
_currentCancellationSource.CancelAfter(TimeSpan.FromMinutes(1));
_currentCancellationSource.CancelAfter(TimeSpan.FromSeconds(cancelTime));
var token = _currentCancellationSource.Token;
return token;
}

private async Task<bool> ProcessRequest()
{
_logger.Log(GrpcLoggingLevel.Verbose, $" Processing Request");
_logger.Log(GrpcLoggingLevel.Verbose, "Processing Request {0} {1} {2}", _session.SessionId, _operationId, _lastResponseId);

try
{
Expand Down Expand Up @@ -254,35 +255,63 @@ private AsyncServerStreamingCall<ExecutePlanResponse> GetResponse()
if (_operationId == string.Empty)
{
var request = CreateRequest();
_logger.Log(GrpcLoggingLevel.Verbose, "Calling Execute Plan on session {0}", _session.SessionId);
_logger.Log(GrpcLoggingLevel.Verbose, "Calling Execute Plan on session {0} with operation id {1}", _session.SessionId, _operationId);
return _session.GrpcClient.ExecutePlan(request, _session.Headers, null, GetScheduledCancellationToken());
}
else
{
var request = CreateReattachRequest();
_logger.Log(GrpcLoggingLevel.Verbose, "Calling ReattachExecute Plan on session {0}", _session.SessionId);
_logger.Log(GrpcLoggingLevel.Verbose, "Calling ReattachExecute Plan on session {0} with operation id {1}", _session.SessionId, _operationId);
return _session.GrpcClient.ReattachExecute(request, _session.Headers, null, GetScheduledCancellationToken());
}
}
private ExecutePlanRequest CreateRequest() => new()

private ExecutePlanRequest CreateRequest()
{
Plan = _plan, ClientType = _session.ClientType, SessionId = _session.SessionId, UserContext = _session.UserContext, RequestOptions =
_operationId = Guid.NewGuid().ToString();

return new()
{
new ExecutePlanRequest.Types.RequestOption()
OperationId = _operationId,
Plan = _plan,
ClientType = _session.ClientType,
SessionId = _session.SessionId,
UserContext = _session.UserContext,
RequestOptions =
{
ReattachOptions = new ReattachOptions()
new ExecutePlanRequest.Types.RequestOption()
{
Reattachable = true
ReattachOptions = new ReattachOptions()
{
Reattachable = true
}
}
}
}
};
};
}

private ReattachExecuteRequest CreateReattachRequest() => new()
private ReattachExecuteRequest CreateReattachRequest()
{
ClientType = _session.ClientType, SessionId = _session.SessionId, UserContext = _session.UserContext, OperationId = _operationId, LastResponseId = _lastResponseId
};
if (_lastResponseId == string.Empty)
{
return new()
{
ClientType = _session.ClientType,
SessionId = _session.SessionId,
UserContext = _session.UserContext,
OperationId = _operationId
};
}

return new()
{
ClientType = _session.ClientType,
SessionId = _session.SessionId,
UserContext = _session.UserContext,
OperationId = _operationId,
LastResponseId = _lastResponseId
};
}

private ReleaseExecuteRequest CreateReleaseRequest() => new()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public static class SparkDotnetKnownConfigKeys
public const string GrpcLogging = RuntimeConf.SparkDotnetConfigKey + "grpclogging";
public const string PrintMetrics = RuntimeConf.SparkDotnetConfigKey + "showmetrics";
public const string DontDecodeArrow = RuntimeConf.SparkDotnetConfigKey + "dontdecodearrow";
public const string RequestExecutorCancelTimeout = RuntimeConf.SparkDotnetConfigKey + "requestretrytimelimit";
}

public class RuntimeConf
Expand Down Expand Up @@ -240,6 +241,16 @@ public void Unset(string key)
task.Wait();
}

public string GetOrDefault(string key, string defaultValue)
{
var value = Get(key);
if (string.IsNullOrEmpty(value))
{
return defaultValue;
}

return value;
}
public string Get(string key)
{
if (key.ToLowerInvariant().StartsWith(SparkDotnetConfigKey))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Spark.Connect.Dotnet.Sql;
using Xunit.Abstractions;
using static Spark.Connect.Dotnet.Sql.Functions;

namespace Spark.Connect.Dotnet.Tests.RequestExecutorTests;

public class RequestExecutorTests : E2ETestBase
{

public RequestExecutorTests(ITestOutputHelper logger) : base(logger)
{
}

[Fact(Skip = "Pauses for 2 mins - run when testing RequestExecutor")]
public void Initial_Request_Takes_Longer_Than_Timeout()
{
Spark.Conf.Set(SparkDotnetKnownConfigKeys.GrpcLogging, "console");
Spark.Range(1).Select(Reflect(Lit("java.lang.Thread"), Lit("sleep"), Lit((long)160000))).Show();
}
}

0 comments on commit 380f439

Please sign in to comment.