forked from dotnet/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBasic.cs
115 lines (90 loc) · 4.12 KB
/
Basic.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace Microsoft.Spark.Examples.Sql.Batch
{
/// <summary>
/// A simple example demonstrating basic Spark SQL features.
/// </summary>
internal sealed class Basic : IExample
{
public void Run(string[] args)
{
if (args.Length != 1)
{
Console.Error.WriteLine(
"Usage: Basic <path to SPARK_HOME/examples/src/main/resources/people.json>");
Environment.Exit(1);
}
SparkSession spark = SparkSession
.Builder()
.AppName("SQL basic example using .NET for Apache Spark")
.Config("spark.some.config.option", "some-value")
.GetOrCreate();
// Need to explicitly specify the schema since pickling vs. arrow formatting
// will return different types. Pickling will turn longs into ints if the values fit.
// Same as the "age INT, name STRING" DDL-format string.
var inputSchema = new StructType(new[]
{
new StructField("age", new IntegerType()),
new StructField("name", new StringType())
});
DataFrame df = spark.Read().Schema(inputSchema).Json(args[0]);
Spark.Sql.Types.StructType schema = df.Schema();
Console.WriteLine(schema.SimpleString);
IEnumerable<Row> rows = df.Collect();
foreach (Row row in rows)
{
Console.WriteLine(row);
}
df.Show();
df.PrintSchema();
df.Select("name", "age", "age", "name").Show();
df.Select(df["name"], df["age"] + 1).Show();
df.Filter(df["age"] > 21).Show();
df.GroupBy("age")
.Agg(Avg(df["age"]), Avg(df["age"]), CountDistinct(df["age"], df["age"]))
.Show();
df.CreateOrReplaceTempView("people");
// Registering Udf for SQL expression.
DataFrame sqlDf = spark.Sql("SELECT * FROM people");
sqlDf.Show();
spark.Udf().Register<int?, string, string>(
"my_udf",
(age, name) => name + " with " + ((age.HasValue) ? age.Value.ToString() : "null"));
sqlDf = spark.Sql("SELECT my_udf(*) FROM people");
sqlDf.Show();
// Using UDF via data frames.
Func<Column, Column, Column> addition = Udf<int?, string, string>(
(age, name) => name + " is " + (age.HasValue ? age.Value + 10 : 0));
df.Select(addition(df["age"], df["name"])).Show();
// Chaining example:
Func<Column, Column> addition2 = Udf<string, string>(str => $"hello {str}!");
df.Select(addition2(addition(df["age"], df["name"]))).Show();
// Multiple UDF example:
df.Select(addition(df["age"], df["name"]), addition2(df["name"])).Show();
// UDF return type as array.
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new[] { str, str + str });
df.Select(Explode(udfArray(df["name"]))).Show();
// UDF return type as map.
Func<Column, Column> udfMap =
Udf<string, IDictionary<string, string[]>>(
(str) => new Dictionary<string, string[]> { { str, new[] { str, str } } });
df.Select(udfMap(df["name"]).As("UdfMap")).Show(truncate: 50);
// Joins.
DataFrame joinedDf = df.Join(df, "name");
joinedDf.Show();
DataFrame joinedDf2 = df.Join(df, new[] { "name", "age" });
joinedDf2.Show();
DataFrame joinedDf3 = df.Join(df, df["name"] == df["name"], "outer");
joinedDf3.Show();
spark.Stop();
}
}
}