Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookup join #3179

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand All @@ -66,6 +67,7 @@
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
Expand All @@ -89,6 +91,7 @@
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalLookup;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
Expand Down Expand Up @@ -507,6 +510,113 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) {
consecutive);
}

/** Build {@link LogicalLookup}. */
@Override
public LogicalPlan visitLookup(Lookup node, AnalysisContext queryContext) {
LogicalPlan child = node.getChild().get(0).accept(this, queryContext);
List<Argument> options = node.getOptions();
// Todo, refactor the option.
Boolean appendOnly = (Boolean) options.get(0).getValue().getValue();

Table table =
dataSourceService
.getDataSource(DEFAULT_DATASOURCE_NAME)
.getStorageEngine()
.getTable(null, node.getIndexName());

if (table == null || !table.exists()) {
throw new SemanticCheckException(
String.format("no such lookup index %s", node.getIndexName()));
}

AnalysisContext lookupTableContext = new AnalysisContext();
TypeEnvironment lookupTableEnvironment = lookupTableContext.peek();
table
.getFieldTypes()
.forEach(
(name, type) ->
lookupTableEnvironment.define(new Symbol(Namespace.FIELD_NAME, name), type));
ImmutableMap<ReferenceExpression, ReferenceExpression> matchFieldMap =
analyzeLookupMatchFields(node.getMatchFieldList(), queryContext, lookupTableContext);

return new LogicalLookup(
child,
node.getIndexName(),
matchFieldMap,
appendOnly,
analyzeLookupCopyFields(node.getCopyFieldList(), queryContext, table));
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLookupMatchFields(
List<Map> inputMap, AnalysisContext queryContext, AnalysisContext lookupTableContext) {
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), lookupTableContext);
if (resultMap.getTarget() instanceof Field) {
Expression targetExpression =
expressionAnalyzer.analyze(resultMap.getTarget(), queryContext);
ReferenceExpression targetReference =
DSL.ref(targetExpression.toString(), targetExpression.type());
ReferenceExpression originReference = DSL.ref(origin.toString(), origin.type());
TypeEnvironment curEnv = queryContext.peek();
curEnv.remove(originReference);
curEnv.define(targetReference);
copyMapBuilder.put(originReference, targetReference);
} else {
throw new SemanticCheckException(
String.format("the target expected to be field, but is %s", resultMap.getTarget()));
}
}

return copyMapBuilder.build();
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLookupCopyFields(
List<Map> inputMap, AnalysisContext context, Table table) {

TypeEnvironment curEnv = context.peek();
java.util.Map<String, ExprType> fieldTypes = table.getFieldTypes();

if (inputMap.isEmpty()) {
fieldTypes.forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
return ImmutableMap.<ReferenceExpression, ReferenceExpression>builder().build();
}

ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
if (resultMap.getOrigin() instanceof Field && resultMap.getTarget() instanceof Field) {
String fieldName = ((Field) resultMap.getOrigin()).getField().toString();
ExprType ex = fieldTypes.get(fieldName);

if (ex == null) {
throw new SemanticCheckException(String.format("no such field %s", fieldName));
}

ReferenceExpression origin = new ReferenceExpression(fieldName, ex);

if (resultMap.getTarget().equals(resultMap.getOrigin())) {

curEnv.define(origin);
copyMapBuilder.put(origin, origin);
} else {
ReferenceExpression target =
new ReferenceExpression(((Field) resultMap.getTarget()).getField().toString(), ex);
curEnv.define(target);
copyMapBuilder.put(origin, target);
}
} else {
throw new SemanticCheckException(
String.format(
"the origin and target expected to be field, but is %s/%s",
resultMap.getOrigin(), resultMap.getTarget()));
}
}

return copyMapBuilder.build();
}

/** Logical head is identical to {@link LogicalLimit}. */
public LogicalPlan visitHead(Head node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand Down Expand Up @@ -217,6 +218,10 @@ public T visitDedupe(Dedupe node, C context) {
return visitChildren(node, context);
}

public T visitLookup(Lookup node, C context) {
return visitChildren(node, context);
}

public T visitHead(Head node, C context) {
return visitChildren(node, context);
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ast.dsl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -441,6 +443,25 @@ public static Dedupe dedupe(UnresolvedPlan input, List<Argument> options, Field.
return new Dedupe(input, options, Arrays.asList(fields));
}

public static Lookup lookup(
UnresolvedPlan input,
String indexName,
List<Map> matchFieldList,
List<Argument> options,
List<Map> copyFieldList) {
return new Lookup(input, indexName, matchFieldList, options, copyFieldList);
}

public static List<Map> fieldMap(String field, String asField, String... more) {
assert more == null || more.length % 2 == 0;
List list = new ArrayList();
list.add(map(field, asField));
for (int i = 0; i < more.length; i = i + 2) {
list.add(map(more[i], more[i + 1]));
}
return list;
}

public static Head head(UnresolvedPlan input, Integer size, Integer from) {
return new Head(input, size, from);
}
Expand Down
49 changes: 49 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Map;

/** AST node represent Lookup operation. */
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
public class Lookup extends UnresolvedPlan {
private UnresolvedPlan child;
private final String indexName;
private final List<Map> matchFieldList;
private final List<Argument> options;
private final List<Map> copyFieldList;

@Override
public Lookup attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitLookup(this, context);
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
import org.opensearch.sql.planner.physical.LimitOperator;
import org.opensearch.sql.planner.physical.LookupOperator;
import org.opensearch.sql.planner.physical.NestedOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;
Expand Down Expand Up @@ -171,6 +172,20 @@ public ExplainResponseNode visitDedupe(DedupeOperator node, Object context) {
"consecutive", node.getConsecutive())));
}

@Override
public ExplainResponseNode visitLookup(LookupOperator node, Object context) {
return explain(
node,
context,
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"copyfields", node.getCopyFieldMap().toString(),
"matchfields", node.getMatchFieldMap().toString(),
"indexname", node.getIndexName(),
"appendonly", node.getAppendOnly())));
}

@Override
public ExplainResponseNode visitRareTopN(RareTopNOperator node, Object context) {
return explain(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.Arrays;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.expression.ReferenceExpression;

/** Logical Lookup Plan. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class LogicalLookup extends LogicalPlan {

private final String indexName;
private final Map<ReferenceExpression, ReferenceExpression> matchFieldMap;
private final Map<ReferenceExpression, ReferenceExpression> copyFieldMap;
private final Boolean appendOnly;

/** Constructor of LogicalLookup. */
public LogicalLookup(
LogicalPlan child,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean appendOnly,
Map<ReferenceExpression, ReferenceExpression> copyFieldMap) {
super(Arrays.asList(child));
this.indexName = indexName;
this.copyFieldMap = copyFieldMap;
this.matchFieldMap = matchFieldMap;
this.appendOnly = appendOnly;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitLookup(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,13 @@ public LogicalPlan values(List<LiteralExpression>... values) {
public static LogicalPlan limit(LogicalPlan input, Integer limit, Integer offset) {
return new LogicalLimit(input, limit, offset);
}

public static LogicalPlan lookup(
LogicalPlan input,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean appendOnly,
Map<ReferenceExpression, ReferenceExpression> copyFields) {
return new LogicalLookup(input, indexName, matchFieldMap, appendOnly, copyFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public R visitDedupe(LogicalDedupe plan, C context) {
return visitNode(plan, context);
}

public R visitLookup(LogicalLookup plan, C context) {
return visitNode(plan, context);
}

public R visitRename(LogicalRename plan, C context) {
return visitNode(plan, context);
}
Expand Down
Loading
Loading