Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On first request set our own operationId so reattachexecute works with spark 3.5.3 #42

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,16 @@ To output metrics as responses are reveived enable:

```csharp
spark.Conf.Set("spark.connect.dotnet.showmetrics", "true");
```
```

## Request Wait Timeout

When we send a request to the server, we need to kill the connection and retry after X seconds because if we have an idle connection then Azure Databricks will kill the tcp connection. To handle this we send a request, wait x seconds and if we haven't completed the response we re-connect to the running query and keep re-connecting every x seconds.

To change how long we wait before killing the connection and re-connecting you can set:

```csharp
spark.Conf.Set("spark.connect.dotnet.requestretrytimelimit", "30");
```

The setting is the amount of seconds, it defaults to 45. It isn't really needed for Spark 4 as it has a sort of keep alive where it sends a response even while the server is busy doing something.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RequestExecutor : IDisposable
private readonly SparkSession _session;
private readonly Plan _plan;
private readonly GrpcLogger _logger;

private string _operationId = string.Empty;
private string _lastResponseId = string.Empty;
private bool _isComplete = false;
Expand Down Expand Up @@ -84,21 +84,22 @@ public async Task ExecAsync()
while (shouldContinue && !_isComplete)
{
shouldContinue = await ProcessRequest();
_logger.Log(GrpcLoggingLevel.Verbose, $" Processed Request, continue?: {shouldContinue}");
_logger.Log(GrpcLoggingLevel.Verbose, "Processed Request, continue?: {0} {1} {2} {3}", shouldContinue, _session.SessionId, _operationId, _lastResponseId);
}
}

private CancellationToken GetScheduledCancellationToken()
{
var cancelTime = int.Parse(_session.Conf.GetOrDefault(SparkDotnetKnownConfigKeys.RequestExecutorCancelTimeout, "45"));
_currentCancellationSource = new CancellationTokenSource();
_currentCancellationSource.CancelAfter(TimeSpan.FromMinutes(1));
_currentCancellationSource.CancelAfter(TimeSpan.FromSeconds(cancelTime));
var token = _currentCancellationSource.Token;
return token;
}

private async Task<bool> ProcessRequest()
{
_logger.Log(GrpcLoggingLevel.Verbose, $" Processing Request");
_logger.Log(GrpcLoggingLevel.Verbose, "Processing Request {0} {1} {2}", _session.SessionId, _operationId, _lastResponseId);

try
{
Expand Down Expand Up @@ -254,35 +255,63 @@ private AsyncServerStreamingCall<ExecutePlanResponse> GetResponse()
if (_operationId == string.Empty)
{
var request = CreateRequest();
_logger.Log(GrpcLoggingLevel.Verbose, "Calling Execute Plan on session {0}", _session.SessionId);
_logger.Log(GrpcLoggingLevel.Verbose, "Calling Execute Plan on session {0} with operation id {1}", _session.SessionId, _operationId);
return _session.GrpcClient.ExecutePlan(request, _session.Headers, null, GetScheduledCancellationToken());
}
else
{
var request = CreateReattachRequest();
_logger.Log(GrpcLoggingLevel.Verbose, "Calling ReattachExecute Plan on session {0}", _session.SessionId);
_logger.Log(GrpcLoggingLevel.Verbose, "Calling ReattachExecute Plan on session {0} with operation id {1}", _session.SessionId, _operationId);
return _session.GrpcClient.ReattachExecute(request, _session.Headers, null, GetScheduledCancellationToken());
}
}
private ExecutePlanRequest CreateRequest() => new()

private ExecutePlanRequest CreateRequest()
{
Plan = _plan, ClientType = _session.ClientType, SessionId = _session.SessionId, UserContext = _session.UserContext, RequestOptions =
_operationId = Guid.NewGuid().ToString();

return new()
{
new ExecutePlanRequest.Types.RequestOption()
OperationId = _operationId,
Plan = _plan,
ClientType = _session.ClientType,
SessionId = _session.SessionId,
UserContext = _session.UserContext,
RequestOptions =
{
ReattachOptions = new ReattachOptions()
new ExecutePlanRequest.Types.RequestOption()
{
Reattachable = true
ReattachOptions = new ReattachOptions()
{
Reattachable = true
}
}
}
}
};
};
}

private ReattachExecuteRequest CreateReattachRequest() => new()
private ReattachExecuteRequest CreateReattachRequest()
{
ClientType = _session.ClientType, SessionId = _session.SessionId, UserContext = _session.UserContext, OperationId = _operationId, LastResponseId = _lastResponseId
};
if (_lastResponseId == string.Empty)
{
return new()
{
ClientType = _session.ClientType,
SessionId = _session.SessionId,
UserContext = _session.UserContext,
OperationId = _operationId
};
}

return new()
{
ClientType = _session.ClientType,
SessionId = _session.SessionId,
UserContext = _session.UserContext,
OperationId = _operationId,
LastResponseId = _lastResponseId
};
}

private ReleaseExecuteRequest CreateReleaseRequest() => new()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public static class SparkDotnetKnownConfigKeys
public const string GrpcLogging = RuntimeConf.SparkDotnetConfigKey + "grpclogging";
public const string PrintMetrics = RuntimeConf.SparkDotnetConfigKey + "showmetrics";
public const string DontDecodeArrow = RuntimeConf.SparkDotnetConfigKey + "dontdecodearrow";
public const string RequestExecutorCancelTimeout = RuntimeConf.SparkDotnetConfigKey + "requestretrytimelimit";
}

public class RuntimeConf
Expand Down Expand Up @@ -240,6 +241,16 @@ public void Unset(string key)
task.Wait();
}

public string GetOrDefault(string key, string defaultValue)
{
var value = Get(key);
if (string.IsNullOrEmpty(value))
{
return defaultValue;
}

return value;
}
public string Get(string key)
{
if (key.ToLowerInvariant().StartsWith(SparkDotnetConfigKey))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Spark.Connect.Dotnet.Sql;
using Xunit.Abstractions;
using static Spark.Connect.Dotnet.Sql.Functions;

namespace Spark.Connect.Dotnet.Tests.RequestExecutorTests;

public class RequestExecutorTests : E2ETestBase
{

public RequestExecutorTests(ITestOutputHelper logger) : base(logger)
{
}

[Fact(Skip = "Pauses for 2 mins - run when testing RequestExecutor")]
public void Initial_Request_Takes_Longer_Than_Timeout()
{
Spark.Conf.Set(SparkDotnetKnownConfigKeys.GrpcLogging, "console");
Spark.Range(1).Select(Reflect(Lit("java.lang.Thread"), Lit("sleep"), Lit((long)160000))).Show();
}
}
Loading