Skip to content

Commit

Permalink
code function docs (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
GoEddie authored Dec 4, 2024
1 parent cd8c07f commit 0ad5e00
Show file tree
Hide file tree
Showing 22 changed files with 1,558 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
namespace Spark.Connect.Dotnet.Databricks;

/// <summary>
/// Are we connecting to Databricks? used to control whether we wait for a connection.
/// </summary>
public class IsDatabricks
{
/// <summary>
/// Valiudates whether the url contains the name 'databricks'
/// </summary>
/// <param name="url"></param>
/// <returns></returns>
public static bool Url(string url)
{
return url.Contains("databricks", StringComparison.OrdinalIgnoreCase);
Expand Down
102 changes: 102 additions & 0 deletions src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/GrpcInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,23 @@

namespace Spark.Connect.Dotnet.Grpc;

/// <summary>
/// This is the class used to pass messages down the gRPC channel.
/// </summary>
public static class GrpcInternal
{
/// <summary>
/// Explain
/// </summary>
/// <param name="client"></param>
/// <param name="sessionId"></param>
/// <param name="plan"></param>
/// <param name="headers"></param>
/// <param name="userContext"></param>
/// <param name="clientType"></param>
/// <param name="explainExtended"></param>
/// <param name="mode"></param>
/// <returns></returns>
public static string Explain(SparkConnectService.SparkConnectServiceClient client, string sessionId, Plan plan,
Metadata headers, UserContext userContext, string clientType, bool explainExtended, string? mode)
{
Expand All @@ -34,6 +49,13 @@ public static string Explain(SparkConnectService.SparkConnectServiceClient clien
return analyzeResponse.Explain.ExplainString;
}

/// <summary>
/// Persist
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <param name="storageLevel"></param>
/// <returns></returns>
public static Relation Persist(SparkSession session, Relation relation, StorageLevel storageLevel)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -49,6 +71,18 @@ public static Relation Persist(SparkSession session, Relation relation, StorageL
return relation;
}

/// <summary>
/// Schema
/// </summary>
/// <param name="client"></param>
/// <param name="sessionId"></param>
/// <param name="plan"></param>
/// <param name="headers"></param>
/// <param name="userContext"></param>
/// <param name="clientType"></param>
/// <param name="explainExtended"></param>
/// <param name="mode"></param>
/// <returns></returns>
public static DataType Schema(SparkConnectService.SparkConnectServiceClient client, string sessionId, Plan plan,
Metadata headers, UserContext userContext, string clientType, bool explainExtended, string mode)
{
Expand All @@ -65,6 +99,11 @@ public static DataType Schema(SparkConnectService.SparkConnectServiceClient clie
return analyzeResponse.Schema.Schema_;
}

/// <summary>
/// What is the Spark Version you are connect to?
/// </summary>
/// <param name="session"></param>
/// <returns></returns>
public static string Version(SparkSession session)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -76,6 +115,12 @@ public static string Version(SparkSession session)
return analyzeResponse.SparkVersion.Version;
}

/// <summary>
/// Get a list of the input files
/// </summary>
/// <param name="session"></param>
/// <param name="plan"></param>
/// <returns></returns>
public static IEnumerable<string> InputFiles(SparkSession session, Plan plan)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -91,6 +136,12 @@ public static IEnumerable<string> InputFiles(SparkSession session, Plan plan)
return analyzeResponse.InputFiles.Files.Select(p => p);
}

/// <summary>
/// Is it a local plan?
/// </summary>
/// <param name="session"></param>
/// <param name="plan"></param>
/// <returns></returns>
public static bool IsLocal(SparkSession session, Plan plan)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -106,6 +157,13 @@ public static bool IsLocal(SparkSession session, Plan plan)
return analyzeResponse.IsLocal.IsLocal_;
}

/// <summary>
/// Get the TreeString
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <param name="level"></param>
/// <returns></returns>
public static string TreeString(SparkSession session, Relation relation, int? level = null)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -129,6 +187,12 @@ public static string TreeString(SparkSession session, Relation relation, int? le
return analyzeResponse.TreeString.TreeString_;
}

/// <summary>
/// Create a semantic hash of the relation
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <returns></returns>
public static int SemanticHash(SparkSession session, Relation relation)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -148,6 +212,12 @@ public static int SemanticHash(SparkSession session, Relation relation)
return analyzeResponse.SemanticHash.Result;
}

/// <summary>
/// What is the storage level?
/// </summary>
/// <param name="session"></param>
/// <param name="relation"></param>
/// <returns></returns>
public static StorageLevel StorageLevel(SparkSession session, Relation relation)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -164,6 +234,12 @@ public static StorageLevel StorageLevel(SparkSession session, Relation relation)
return analyzeResponse.GetStorageLevel.StorageLevel;
}

/// <summary>
/// Is it a Streaming plan
/// </summary>
/// <param name="session"></param>
/// <param name="plan"></param>
/// <returns></returns>
public static bool IsStreaming(SparkSession session, Plan plan)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -179,6 +255,13 @@ public static bool IsStreaming(SparkSession session, Plan plan)
return analyzeResponse.IsStreaming.IsStreaming_;
}

/// <summary>
/// Same Semantics, uses AnalyzePlanRequest
/// </summary>
/// <param name="session"></param>
/// <param name="target"></param>
/// <param name="other"></param>
/// <returns></returns>
public static bool SameSemantics(SparkSession session, Relation target, Relation other)
{
var analyzeRequest = new AnalyzePlanRequest
Expand All @@ -201,6 +284,12 @@ public static bool SameSemantics(SparkSession session, Relation target, Relation
return analyzeResponse.SameSemantics.Result;
}

/// <summary>
/// Unset a config option
/// </summary>
/// <param name="session"></param>
/// <param name="key"></param>
/// <exception cref="SparkException"></exception>
public static async Task ExecUnSetConfigCommandResponse(SparkSession session, string key)
{
var configRequest = new ConfigRequest
Expand Down Expand Up @@ -245,6 +334,12 @@ AsyncUnaryCall<ConfigResponse> Exec()
}
}

/// <summary>
/// Set Config Item
/// </summary>
/// <param name="session"></param>
/// <param name="options"></param>
/// <exception cref="SparkException"></exception>
public static async Task ExecSetConfigCommandResponse(SparkSession session, IDictionary<string, string> options)
{
var configRequest = new ConfigRequest
Expand Down Expand Up @@ -295,6 +390,13 @@ AsyncUnaryCall<ConfigResponse> Exec()
}
}

/// <summary>
/// Get All Config Response
/// </summary>
/// <param name="session"></param>
/// <param name="prefix"></param>
/// <returns></returns>
/// <exception cref="SparkException"></exception>
public static async Task<Dictionary<string, string>> ExecGetAllConfigCommandResponse(SparkSession session,
string? prefix = null)
{
Expand Down
50 changes: 50 additions & 0 deletions src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/GrpcLogger.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
namespace Spark.Connect.Dotnet.Grpc;

/// <summary>
/// Used to log gRPC info, needs migrating to a better logger.
/// </summary>
public class GrpcLogger
{
private readonly GrpcLoggingLevel _level;
private readonly LocalConsole _console;

/// <summary>
/// Create a logger, pass in your own console if you want to redirect output (like in tests!)
/// </summary>
/// <param name="level"></param>
/// <param name="console"></param>
public GrpcLogger(GrpcLoggingLevel level, LocalConsole? console = null)
{
_level = level;
Expand All @@ -13,6 +21,11 @@ public GrpcLogger(GrpcLoggingLevel level, LocalConsole? console = null)
}


/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="message"></param>
public virtual void Log(GrpcLoggingLevel level, string message)
{
if (level >= _level)
Expand All @@ -21,6 +34,12 @@ public virtual void Log(GrpcLoggingLevel level, string message)
}
}

/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="format"></param>
/// <param name="args"></param>
public virtual void Log(GrpcLoggingLevel level, string format, params object[] args)
{
if (level >= _level)
Expand All @@ -30,26 +49,57 @@ public virtual void Log(GrpcLoggingLevel level, string format, params object[] a
}
}

/// <summary>
/// The default logger that does nothing
/// </summary>
public class GrpcNullLogger : GrpcLogger
{
/// <summary>
/// Create the default null logger
/// </summary>
/// <param name="level"></param>
/// <param name="console"></param>
public GrpcNullLogger(GrpcLoggingLevel level, LocalConsole? console = null) : base(level, console)
{
}

/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="format"></param>
/// <param name="args"></param>
public override void Log(GrpcLoggingLevel level, string format, params object[] args)
{

}

/// <summary>
/// Log
/// </summary>
/// <param name="level"></param>
/// <param name="message"></param>
public override void Log(GrpcLoggingLevel level, string message)
{

}
}

/// <summary>
/// Log Level
/// </summary>
public enum GrpcLoggingLevel
{
/// <summary>
/// None
/// </summary>
None,
/// <summary>
/// Warn
/// </summary>
Warn,
/// <summary>
/// Verbose
/// </summary>
Verbose
}
16 changes: 16 additions & 0 deletions src/Spark.Connect.Dotnet/Spark.Connect.Dotnet/Grpc/Logger.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
namespace Spark.Connect.Dotnet.Grpc;

/// <summary>
/// Logger
/// </summary>
public class Logger
{
private static int _level;

/// <summary>
/// Create the logger at whatever level you like
/// </summary>
/// <param name="level"></param>
public Logger(int level)
{
_level = level;
}

/// <summary>
/// Log
/// </summary>
/// <param name="message"></param>
public static void WriteLine(string message)
{
if (_level > 1)
Expand All @@ -17,6 +28,11 @@ public static void WriteLine(string message)
}
}

/// <summary>
/// Log
/// </summary>
/// <param name="message"></param>
/// <param name="parameters"></param>
public static void WriteLine(string message, params object[] parameters)
{
if (_level > 1)
Expand Down
Loading

0 comments on commit 0ad5e00

Please sign in to comment.