Skip to content

Commit

Permalink
add documentation and fix IT tests
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Nov 7, 2024
1 parent 1a34544 commit a4992e8
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 5 deletions.
4 changes: 3 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ source = table | where ispresent(a) |

- `source=accounts | rare gender`
- `source=accounts | rare age by gender`
- `source=accounts | rare age by gender sample(50 percent)`

#### **Top**
[See additional command details](ppl-top-command.md)

- `source=accounts | top gender`
- `source=accounts | top 1 gender`
- `source=accounts | top 1 age by gender`
- `source=accounts | top 5 gender sample(50 percent)`
- `source=accounts | top 5 age by gender`

#### **Parse**
[See additional command details](ppl-parse-command.md)
Expand Down
10 changes: 9 additions & 1 deletion docs/ppl-lang/ppl-rare-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ Using ``rare`` command to find the least common tuple of values of all fields in
**Note**: A maximum of 10 results is returned for each distinct tuple of values of the group-by fields.

**Syntax**
`rare <field-list> [by-clause]`
`rare <field-list> [by-clause] [sample(? percent)]`

* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* sample: optional. allows reducing the amount of fields being scanned using table sample strategy favour velocity over precision


### Example 1: Find the least common values in a field
Expand Down Expand Up @@ -44,3 +45,10 @@ PPL query:
| M | 33 |
| M | 36 |
+----------+-------+

### Example 3: Find the least common values using 50 % sampling strategy

PPL query:

os> source=accounts | rare age sample(50 percent);
fetched rows / total rows = 2/4
12 changes: 11 additions & 1 deletion docs/ppl-lang/ppl-top-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ Using ``top`` command to find the most common tuple of values of all fields in t


### Syntax
`top [N] <field-list> [by-clause]`
`top [N] <field-list> [by-clause] [sample(? percent)]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* sample: optional. allows reducing the amount of fields being scanned using table sample strategy favour velocity over precision


### Example 1: Find the most common values in a field
Expand Down Expand Up @@ -56,3 +57,12 @@ PPL query:
| M | 32 |
+----------+-------+

## Example 2: Find the most common values organized by gender using sample strategy

The example finds most common age of all the accounts group by gender sample only 50 % of rows.

PPL query:

os> source=accounts | top 1 age by gender sample(50 percent);
fetched rows / total rows = 1/2

Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,52 @@ class FlintSparkPPLTopAndRareITSuite
val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl rare address field query test sample 75 %") {
val frame = sql(s"""
| source = $testTable| rare address sample(75 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

val expectedRow = Row(1, "Vancouver")
assert(
results.head == expectedRow,
s"Expected least frequent result to be $expectedRow, but got ${results.head}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val addressField = UnresolvedAttribute("address")
val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val aggregateExpressions = Seq(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")(),
addressField)

val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(addressField),
aggregateExpressions,
Sample(0, 0.75, withReplacement = false, 0, table))
val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")(),
Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl rare address by age field query test") {
val frame = sql(s"""
Expand Down Expand Up @@ -111,11 +157,61 @@ class FlintSparkPPLTopAndRareITSuite
"count_address")()

val aggregateExpressions = Seq(countExpr, addressField, ageAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(addressField, ageAlias),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
table)

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")(),
Ascending)),
global = true,
aggregatePlan)

val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl rare address by age field query test sample 75 %") {
val frame = sql(s"""
| source = $testTable| rare address by age sample(75 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 4)

val expectedRow = Row(1, "Vancouver", 60)
assert(
results.head == expectedRow,
s"Expected least frequent result to be $expectedRow, but got ${results.head}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val addressField = UnresolvedAttribute("address")
val ageField = UnresolvedAttribute("age")
val ageAlias = Alias(ageField, "age")()

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val countExpr = Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")()

val aggregateExpressions = Seq(countExpr, addressField, ageAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(addressField, ageAlias),
aggregateExpressions,
Sample(0, 0.75, withReplacement = false, 0, table))

val sortedPlan: LogicalPlan =
Sort(
Expand Down Expand Up @@ -225,6 +321,46 @@ class FlintSparkPPLTopAndRareITSuite
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top 3 countries query test sample 75 %") {
val frame = sql(s"""
| source = $newTestTable| top 3 country sample(75 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val countryField = UnresolvedAttribute("country")
val countExpr = Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(countryField),
aggregateExpressions,
Sample(0, 0.75, withReplacement = false, 0, table))

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")(),
Descending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top 2 countries by occupation field query test") {
val frame = sql(s"""
Expand Down Expand Up @@ -254,11 +390,56 @@ class FlintSparkPPLTopAndRareITSuite
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField, occupationFieldAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(countryField, occupationFieldAlias),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test")))
table)

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")(),
Descending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)

}

test("create ppl top 2 countries by occupation field query test sample 85 %") {
val frame = sql(s"""
| source = $newTestTable| top 3 country by occupation sample(85 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val countryField = UnresolvedAttribute("country")
val occupationField = UnresolvedAttribute("occupation")
val occupationFieldAlias = Alias(occupationField, "occupation")()

val countExpr = Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField, occupationFieldAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(countryField, occupationFieldAlias),
aggregateExpressions,
Sample(0, 0.85, withReplacement = false, 0, table))

val sortedPlan: LogicalPlan =
Sort(
Expand Down

0 comments on commit a4992e8

Please sign in to comment.