Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code function docs #47

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
namespace Spark.Connect.Dotnet.Databricks;

/// <summary>
/// Are we connecting to Databricks? used to control whether we wait for a connection.
/// </summary>
public class IsDatabricks
{
/// <summary>
/// Valiudates whether the url contains the name 'databricks'
/// </summary>
/// <param name="url"></param>
/// <returns></returns>
public static bool Url(string url)
{
return url.Contains("databricks", StringComparison.OrdinalIgnoreCase);
Expand Down
102 changes: 102 additions & 0 deletions src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/GrpcInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,23 @@

namespace Spark.Connect.Dotnet.Grpc;

/// <summary>
/// This is the class used to pass messages down the gRPC channel.
/// </summary>
public static class GrpcInternal
{
/// <summary>
/// Explain
/// </summary>
/// <param name="client"></param>
/// <param name="sessionId"></param>
/// <param name="plan"></param>
/// <param name="headers"></param>
/// <param name="userContext"></param>
/// <param name="clientType"></param>
/// <param name="explainExtended"></param>
/// <param name="mode"></param>
/// <returns></returns>
public static string Explain(SparkConnectService.SparkConnectServiceClient client, string sessionId, Plan plan,
Metadata headers, UserContext userContext, string clientType, bool explainExtended, string? mode)
{
Expand All @@ -34,6 +49,13 @@ public static string Explain(SparkConnectService.SparkConnectServiceClient clien
return analyzeResponse.Explain.ExplainString;
}

/// <summary>
/// Persist
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <param name="storageLevel"></param>
/// <returns></returns>
public static Relation Persist(SparkSession session, Relation relation, StorageLevel storageLevel)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -49,6 +71,18 @@ public static Relation Persist(SparkSession session, Relation relation, StorageL
return relation;
}

/// <summary>
/// Schema
/// </summary>
/// <param name="client"></param>
/// <param name="sessionId"></param>
/// <param name="plan"></param>
/// <param name="headers"></param>
/// <param name="userContext"></param>
/// <param name="clientType"></param>
/// <param name="explainExtended"></param>
/// <param name="mode"></param>
/// <returns></returns>
public static DataType Schema(SparkConnectService.SparkConnectServiceClient client, string sessionId, Plan plan,
Metadata headers, UserContext userContext, string clientType, bool explainExtended, string mode)
{
Expand All @@ -65,6 +99,11 @@ public static DataType Schema(SparkConnectService.SparkConnectServiceClient clie
return analyzeResponse.Schema.Schema_;
}

/// <summary>
/// What is the Spark Version you are connect to?
/// </summary>
/// <param name="session"></param>
/// <returns></returns>
public static string Version(SparkSession session)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -76,6 +115,12 @@ public static string Version(SparkSession session)
return analyzeResponse.SparkVersion.Version;
}

/// <summary>
/// Get a list of the input files
/// </summary>
/// <param name="session"></param>
/// <param name="plan"></param>
/// <returns></returns>
public static IEnumerable<string> InputFiles(SparkSession session, Plan plan)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -91,6 +136,12 @@ public static IEnumerable<string> InputFiles(SparkSession session, Plan plan)
return analyzeResponse.InputFiles.Files.Select(p => p);
}

/// <summary>
/// Is it a local plan?
/// </summary>
/// <param name="session"></param>
/// <param name="plan"></param>
/// <returns></returns>
public static bool IsLocal(SparkSession session, Plan plan)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -106,6 +157,13 @@ public static bool IsLocal(SparkSession session, Plan plan)
return analyzeResponse.IsLocal.IsLocal_;
}

/// <summary>
/// Get the TreeString
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <param name="level"></param>
/// <returns></returns>
public static string TreeString(SparkSession session, Relation relation, int? level = null)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -129,6 +187,12 @@ public static string TreeString(SparkSession session, Relation relation, int? le
return analyzeResponse.TreeString.TreeString_;
}

/// <summary>
/// Create a semantic hash of the relation
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <returns></returns>
public static int SemanticHash(SparkSession session, Relation relation)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -148,6 +212,12 @@ public static int SemanticHash(SparkSession session, Relation relation)
return analyzeResponse.SemanticHash.Result;
}

/// <summary>
/// What is the storage level?
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <returns></returns>
public static StorageLevel StorageLevel(SparkSession session, Relation relation)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -164,6 +234,12 @@ public static StorageLevel StorageLevel(SparkSession session, Relation relation)
return analyzeResponse.GetStorageLevel.StorageLevel;
}

/// <summary>
/// Is it a Streaming plan
/// </summary>
/// <param name="session"></param>
/// <param name="plan"></param>
/// <returns></returns>
public static bool IsStreaming(SparkSession session, Plan plan)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -179,6 +255,13 @@ public static bool IsStreaming(SparkSession session, Plan plan)
return analyzeResponse.IsStreaming.IsStreaming_;
}

/// <summary>
/// Same Semantics, uses AnalyzePlanRequest
/// </summary>
/// <param name="session"></param>
/// <param name="target"></param>
/// <param name="other"></param>
/// <returns></returns>
public static bool SameSemantics(SparkSession session, Relation target, Relation other)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -201,6 +284,12 @@ public static bool SameSemantics(SparkSession session, Relation target, Relation
return analyzeResponse.SameSemantics.Result;
}

/// <summary>
/// Unset a config option
/// </summary>
/// <param name="session"></param>
/// <param name="key"></param>
/// <exception cref="SparkException"></exception>
public static async Task ExecUnSetConfigCommandResponse(SparkSession session, string key)
{
var configRequest = new ConfigRequest
Expand Down Expand Up @@ -245,6 +334,12 @@ AsyncUnaryCall<ConfigResponse> Exec()
}
}

/// <summary>
/// Set Config Item
/// </summary>
/// <param name="session"></param>
/// <param name="options"></param>
/// <exception cref="SparkException"></exception>
public static async Task ExecSetConfigCommandResponse(SparkSession session, IDictionary<string, string> options)
{
var configRequest = new ConfigRequest
Expand Down Expand Up @@ -295,6 +390,13 @@ AsyncUnaryCall<ConfigResponse> Exec()
}
}

/// <summary>
/// Get All Config Response
/// </summary>
/// <param name="session"></param>
/// <param name="prefix"></param>
/// <returns></returns>
/// <exception cref="SparkException"></exception>
public static async Task<Dictionary<string, string>> ExecGetAllConfigCommandResponse(SparkSession session,
string? prefix = null)
{
Expand Down
50 changes: 50 additions & 0 deletions src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/GrpcLogger.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
namespace Spark.Connect.Dotnet.Grpc;

/// <summary>
/// Used to log gRPC info, needs migrating to a better logger.
/// </summary>
public class GrpcLogger
{
private readonly GrpcLoggingLevel _level;
private readonly LocalConsole _console;

/// <summary>
/// Create a logger, pass in your own console if you want to redirect output (like in tests!)
/// </summary>
/// <param name="level"></param>
/// <param name="console"></param>
public GrpcLogger(GrpcLoggingLevel level, LocalConsole? console = null)
{
_level = level;
Expand All @@ -13,6 +21,11 @@ public GrpcLogger(GrpcLoggingLevel level, LocalConsole? console = null)
}


/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="message"></param>
public virtual void Log(GrpcLoggingLevel level, string message)
{
if (level >= _level)
Expand All @@ -21,6 +34,12 @@ public virtual void Log(GrpcLoggingLevel level, string message)
}
}

/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="format"></param>
/// <param name="args"></param>
public virtual void Log(GrpcLoggingLevel level, string format, params object[] args)
{
if (level >= _level)
Expand All @@ -30,26 +49,57 @@ public virtual void Log(GrpcLoggingLevel level, string format, params object[] a
}
}

/// <summary>
/// The default logger that does nothing
/// </summary>
public class GrpcNullLogger : GrpcLogger
{
/// <summary>
/// Create the default null logger
/// </summary>
/// <param name="level"></param>
/// <param name="console"></param>
public GrpcNullLogger(GrpcLoggingLevel level, LocalConsole? console = null) : base(level, console)
{
}

/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="format"></param>
/// <param name="args"></param>
public override void Log(GrpcLoggingLevel level, string format, params object[] args)
{

}

/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="message"></param>
public override void Log(GrpcLoggingLevel level, string message)
{

}
}

/// <summary>
/// Log Level
/// </summary>
public enum GrpcLoggingLevel
{
/// <summary>
/// None
/// </summary>
None,
/// <summary>
/// Warn
/// </summary>
Warn,
/// <summary>
/// Verbose
/// </summary>
Verbose
}
16 changes: 16 additions & 0 deletions src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/Logger.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
namespace Spark.Connect.Dotnet.Grpc;

/// <summary>
/// Logger
/// </summary>
public class Logger
{
private static int _level;

/// <summary>
/// Create the logger at whatever level you like
/// </summary>
/// <param name="level"></param>
public Logger(int level)
{
_level = level;
}

/// <summary>
/// Log
/// </summary>
/// <param name="message"></param>
public static void WriteLine(string message)
{
if (_level > 1)
Expand All @@ -17,6 +28,11 @@ public static void WriteLine(string message)
}
}

/// <summary>
/// Log
/// </summary>
/// <param name="message"></param>
/// <param name="parameters"></param>
public static void WriteLine(string message, params object[] parameters)
{
if (_level > 1)
Expand Down
Loading
Loading