Skip to content

Commit

Permalink
add sample class
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Nov 7, 2024
1 parent 6e9d485 commit 793c1ad
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Sample;
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias;
import org.apache.spark.sql.catalyst.plans.logical.Union;
import org.apache.spark.sql.types.Metadata;
Expand All @@ -20,8 +21,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand All @@ -34,6 +37,7 @@

/**
* The context used for Catalyst logical plan.
* A query which translates into multiple plans (sub-query / join-subQuery / scala-subQuery) will have multiple contexts
*/
public class CatalystPlanContext {
/**
Expand All @@ -56,6 +60,10 @@ public class CatalystPlanContext {
* The current traversal context the visitor is going threw
*/
private Stack<LogicalPlan> planTraversalContext = new Stack<>();
/**
* indicate this plan has to sample the relation rather than take the entire data
*/
public Optional<Integer> samplePercentage = Optional.empty();

/**
* NamedExpression contextual parameters
Expand Down Expand Up @@ -129,6 +137,18 @@ public LogicalPlan define(Expression symbol) {
return getPlan();
}

/**
* indicate this plan context is using table sampling
*/
public CatalystPlanContext withSamplePercentage(int percentage) {
this.samplePercentage = Optional.of(percentage);
return this;
}

public Optional<Integer> getSamplePercentage() {
return this.samplePercentage;
}

/**
* append relation to relations list
*
Expand All @@ -140,6 +160,17 @@ public LogicalPlan withRelation(UnresolvedRelation relation) {
return with(relation);
}

/**
* append sample-relation to relations list
*
* @param sampleRelation
* @return
*/
public LogicalPlan withSampleRelation(Sample sampleRelation) {
this.relations.add(sampleRelation.child());
return with(sampleRelation);
}

public void withSubqueryAlias(SubqueryAlias subqueryAlias) {
this.subqueryAlias.add(subqueryAlias);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.GeneratorOuter;
import org.apache.spark.sql.catalyst.expressions.In$;
import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
import org.apache.spark.sql.catalyst.expressions.InSubquery$;
import org.apache.spark.sql.catalyst.expressions.LessThan;
import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
import org.apache.spark.sql.catalyst.expressions.ListQuery$;
import org.apache.spark.sql.catalyst.expressions.MakeInterval$;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.SortDirection;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
Expand All @@ -30,6 +23,7 @@
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Project$;
import org.apache.spark.sql.catalyst.plans.logical.Sample;
import org.apache.spark.sql.execution.ExplainMode;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.execution.command.ExplainCommand;
Expand Down Expand Up @@ -148,10 +142,15 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) {
DescribeRelation$.MODULE$.getOutputAttrs()));
}
//regular sql algebraic relations
node.getQualifiedNames().forEach(q ->
// Resolving the qualifiedName which is composed of a datasource.schema.table
context.withRelation(new UnresolvedRelation(getTableIdentifier(q).nameParts(), CaseInsensitiveStringMap.empty(), false))
);
node.getQualifiedNames().forEach(q -> {
// Resolving the qualifiedName which is composed of a datasource.schema.table
UnresolvedRelation relation = new UnresolvedRelation(getTableIdentifier(q).nameParts(), CaseInsensitiveStringMap.empty(), false);
if(context.getSamplePercentage().isPresent()) {
context.withSampleRelation(new Sample(0, (double)context.getSamplePercentage().get() / 100, false, 0, relation));
} else {
context.withRelation(relation);
}
});
return context.getPlan();
}

Expand Down Expand Up @@ -327,6 +326,10 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex
if ((node instanceof TopAggregation) && ((TopAggregation) node).getResults().isPresent()) {
context.apply(p -> (LogicalPlan) Limit.apply(new org.apache.spark.sql.catalyst.expressions.Literal(
((TopAggregation) node).getResults().get().getValue(), org.apache.spark.sql.types.DataTypes.IntegerType), p));
//add sample context (if exists) to the plan context
if(node.getSample().isPresent()) {
context.withSamplePercentage(node.getSample().get().getPercentage());
}
}
return logicalPlan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.List;
import java.util.Optional;
import java.util.Set;

public interface RelationUtils {
/**
Expand Down

0 comments on commit 793c1ad

Please sign in to comment.