Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Join support in OpenSearch #15185

Open
rishabhmaurya opened this issue Aug 9, 2024 · 23 comments
Open

[RFC] Join support in OpenSearch #15185

rishabhmaurya opened this issue Aug 9, 2024 · 23 comments
Assignees
Labels
enhancement Enhancement or improvement to existing feature or request Roadmap:Search Project-wide roadmap label Search:Query Capabilities v2.18.0 Issues and PRs related to version 2.18.0 v2.19.0 Issues and PRs related to version 2.19.0

Comments

@rishabhmaurya
Copy link
Contributor

rishabhmaurya commented Aug 9, 2024

Is your feature request related to a problem? Please describe

I'm of the opinion that native support on JOIN queries in OpenSearch could be helpful for a lot of users using OpenSearch for Observability use cases. Here is the proposal on how we can do it in OpenSearch making best use of underlying lucene and OpenSearch architecture. If you think that's the bad idea or a good idea, please share your thoughts? Also, share your opinion on overall approach we are headed toward.

Describe the solution you'd like

This is proposal of the execution framework to support JOIN across indexes making use of OpenSearch query DSL. The focus of this document is to design all logical components, their contract and interactions.

Assumptions

  1. Execution framework will not take any decision related to query planning. It assumes Query planner will provide an optimized execution plan for the JOIN query. 
  2. Integration with existing SQL based libraries for JOIN execution like Datafusion is out of scope (this might be a wrong assumption, but this proposal doesn’t incorporate and compares this alternative, which could be a separate RFC?).
  3. Query languages like SQL and PPL and their corresponding plugins will make use of the new Query DSL format to benefit from native JOIN support.

Motivation for native support

  • Better performance and resource utilization over SQL plugin or any plugin based implementation.
  • Address the constraints(check Appendix section) in existing parent-child join and nested field type.
  • No dependence on any other query languages to handle join queries.

Proposal

Design considerations

The execution framework should have well defined logical components. Each component should be modular and must have well defined abstractions and dependencies. For interoperability, it should be possible to rewrite any component in a different language without impacting other modules except the mechanism of communication between them. Each module should be pluggable and extensible to support future use cases like supporting data source other than lucene, different implementation of Join algorithm, serialization protocols etc. 

Performance considerations

Performance optimization for JOIN query will heavily depend on the efficiency of both logical and physical plan, which is out of scope of the query execution framework. For execution framework, key performance considerations are:

  • Minimizing bytes transferred over network: Reducing the volume of rows and bytes transferred per row across the network.
  • Efficient implementation of JOIN algorithms: Implementing JOIN algorithms (e.g., hash, merge, index join) ensuring efficient use of CPU, memory, and I/O resources across the cluster to avoid bottlenecks.
  • Resource isolation and circuit breakers: Sandboxing resources for different components to prevent resource contention and ensure stable performance under various loads.
  • Concurrent execution of independent subtasks wherever applicable.
  • Request caching wherever applicable.

For the rest of the proposal, we will use the following 2 indexes and SQL Join query -

left_index: logs

message: text
status_code: keyword
@timestamp: date
instance_id: keyword
right_index: instance_details

instance_id: keyword
region: keyword
created_at: date

SQL JOIN query

SELECT   
    id.region,   
    l.status_code,   
    COUNT(*) AS status_code_count  
FROM   
    logs l  
JOIN   
    instance_details id   
ON   
    l.instance_id = id.instance_id  
WHERE   
    l.message LIKE '%error%'   
    AND l.@timestamp >= NOW() - INTERVAL '1 HOUR'  
    AND id.created_at >= NOW() - INTERVAL '1 YEAR'  
GROUP BY   
    id.region,   
    l.status_code

SQL query output

region status_code status_code_count
IAD 500 1
ORD 404 2

Proposed Query DSL specification

/logs/_search
{  
  "query": {  
    "bool": {  
      "filter": [  
        {  
          "range": {  
            "@timestamp": {  
              "gte": "now-1h"  
            }  
          }  
        },  
        {  
          "match": {  
            "message": "error"  
          }  
        }  
      ]  
    }  
  },  
  "fields": ["instance_id", "status_code"],  
  "join": {  
    "right_query": {  
        "index": "instance_details",   
        "query": {  
          "range": {  
            "created_at": {  
              "gte": "now-1y"  
            }  
          }  
        },  
        "fields": ["instance_id", "region"]  
    },  
    "type": "inner",   
    "algorithm": "hash_join", // optional  
    "condition": {  
        "left_field": "instance_id",  
        "right_field": "instance_id",  
        "comparator": "="  
    },  
    "fields": ["region", "status_code"],  
    "aggs": {  
      "by_region": {  
        "terms": {  
          "field": "region"  
        },  
        "aggs": {  
          "by_status_code": {  
            "terms": {  
              "field": "status_code"  
            },  
            "aggs": {  
              "status_code_count": {  
                "value_count": {  
                  "field": "status_code"  
                }  
              }  
            }  
          }  
        }  
      }  
    }  
  }  
}

Proposed output:

{  
"join_hits": {  
 "total": 3,  
 "hits": [  
  {  
    "fields": {  
      "status_code": "500",  
       "region": "IAD"  
    }  
   },  
   {  
    "fields": {  
     "status_code": "404",  
     "region": "ORD"  
    }  
   },  
   {  
    "fields": {  
     "status_code": "404",  
      "region": "ORD"  
    }  
   },  
 ],  
 "aggregations": {  
  "by_region": {  
   "buckets": [  
   {  
     "key": "IAD",  
     "doc_count": 1,  
     "by_status_code": {  
       "buckets": [  
        {  
         "key": "500",  
         "doc_count": 1  
        }  
       ]  
      }  
    },  
    {  
      "key": "ORD",  
      "doc_count": 2,  
      "by_status_code": {  
      "buckets": [  
        {  
          "key": "404",  
          "doc_count": 2  
        }  
       ]  
     }  
    }  
   ]  
  }  
 }  
}  
}

Supported Join field type and comparators [P0]: 

Supported Type Comparator
keyword "="
long/float/double "<", ">", "<=", ">=", "="
date "<", ">", "<=", ">=", "="

Supported Join type:
Only inner join.

Supported aggregation type:
Bucket aggregation: term, range aggregation. 
Metric aggregation: Some of the decomposable aggregation will be supported on joined rows like minmaxcountsumavg.

Future enhancements:

  1. We may support other types of join like left outer, right outer etc. 
  2. We may support nested join query, to join across more than 2 indexes, by letting users add a "join" in the "right_query" section.
  3. Derived fields may optionally be defined both for left and right index query which can be used in join condition. Additionally, we may support derived fields to combine fields from left and right index.
  4. We may support selection of aggregation buckets from left_index and right_index as key, value of the row to perform join on them.
  5. We may support a complex join condition with support of expression instead of predefined operators. 
  6. We may support other types of supported bucket and metric aggregations in OpenSearch.
  7. We may support ordering of final join results.
  8. We may support scoring and retrieving results from left and right index ordered by their score. On combining join results, we can combine their score using either custom scorer or the way we combine BM25 score across shards at coordinator today.

Components

To understand various components, let’s look at a possible logical plan tree for this query - 
Logical Plan Tree

1. Source Reader

Purpose
Read rows from the data source. It can make use of an index or simply scan of all rows depending on the query passed to it. It doesn’t work on optimizing the query but blindly executes the query passed to it at the time of initialization. It must support pagination and producing rows in batched manner efficiently. 

For lucene based implementation, SourceReader will have access to the corresponding shard, which is a lucene index, and will execute the given lucene query. It will make use of customized Collector to collect documents and generate rows with docID (optionally) and desired fields to fetch. 

Properties
Type: Lucene
Source identifier: Shard ID
Input
Query: lucene query for lucene based implementation
Pagination info: page size
Fields: fields to fetch
Output
Iterator of matching rows. A row is a tuple of <docID, f1, f2, f3>. Output here is non-serialized version of iterator, for java implementation it will be a new Iterator class object with ability like nextPage() which will fetch all rows in next page. 
Note: It is the responsibility of stream to consume this iterator and perform serialization to send it over network if needed. 

2. Stream

Purpose
The purpose of stream is to transmit rows in a batched manner from one component to another. It makes other components agnostic of physical location of the rows and makes them independent of each making system modular. 

Stream can employ one of the chosen mechanism for RPC like  TransportAction based, gRPC stream or Netty stream. It is the responsibility of a Stream to serialize rows at the source and deserialize them at the sink. For encoding and decoding, it can choose from various supported protocols like Json(or Protobuf in future). It ensures order of the rows is maintained while serialization and deserialization. At source, makes use of Iterator interface to fetch next batch of rows, serializes them and transmits them to sink, where it deserializes them and it consumer can make use of same Iterator interface to consume rows. 

Properties
RPC mechanism: e.g. gRPC/Transport Actions based. 
Protocol: e.g. Proto/Json/Apache Arrow
Sort order key: optional. It doesn’t sort the rows and this property is more like a tag which its consumer can make use of. 
Input
Iterator of rows
Output
Iterator of rows

3. Collator

Purpose
It collates one or more streams into one maintaining ordering guarantees of the returned streams. If there are no ordering constraints, it simply interleaves different iterators by ensuring all iterators are consumed at the same rate. 

If ordering is specified, there can be 2 cases, input streams are ordered or unordered. If ordered, it can simply merge the streams maintaining the order. If input streams are unordered, it could be a heavy duty operation as it needs to fetch all rows from different streams and sort them. 

Join executor can make efficient use of Collator and its ordering guarantees while performing JOIN operation. It can also be used to combine Iterator of multiple shards of same index. However, choosing the right collator and type of join algorithm is part of the query planning. 

Properties
Sort key and order
Input
List of Iterators
Output
Iterator

4. Join Executor

Purpose 
Perform Join on left and right input streams given a join field. It assumes left stream to be the one with more matching rows and it is query planner which swaps left with right if necessary based on cardinality estimations. As part of query execution plan, join executor is initialized with the provided JOIN algorithm.

Properties
Join algorithm: Hash, SortMerge, Broadcast, Index. 
Input
Left Iterator
Right Iterator
Join field
Join condition
Output
Iterator with rows fields

5. Aggregator (Internal)

Purpose
Performs both bucket and metric aggregation on input Iterator. It transforms input Iterator into a dummy ValueSourcewhich BucketCollector of a given Aggregation type can make use of. Instead of DocIDSetIterator, this customized BucketCollector will directly deal with the input Iterator rows and will fetch field values using its dummy ValueSource. 
This approach has benefit of using OpenSearch rich Aggregation framework at ease. 
Input
AggregationQuery
Iterator of rows
Output
InternalAggregation
In future, InternalAggregation can use similar protocol for serialization which Streams are using to optimize on performance. 

6. Aggregator (Reduce)

This is same as Aggregation reduce phase which reduces the results from different shards at coordinator.

Note: Fields and their type definition are available to all components.

Physical plan

Let’s use the same query defined above and create few possible execution plan using components defined above. Here we assume both left and right index are entirely present locally. Distributed physical plan takes care of distributed nature of indexes and shards and builds on top of physical plan. 

Hash Join
Join executor will build a hash table on join key from right index (assuming cardinality of join key on right is lower than right and hash table will fit in memory) and will iterate all rows of left index to join. 

The hash can be built at the time indexing just like it can be done to optimize cardinality aggregation and build eager_global_ordinals for keyword fields at index time. 

Here a row doesn’t need all projection fields but just the join field and matching docIDs. This can save bytes transferred over network. Any subsequent component which needs other projection fields can request them lazily using docIDs and shard identifier. This will be part of the query plan. 

Sort-merge Join
It assumes the rows are ordered by join key in both left and right index. This is a memory efficient algorithm as nothing needs to be kept in-memory in Join executor. 

A possible physical plan:
Let’s assume left index has 4 shards (SL1, SL2) and right index has 2 shards SR1 and matching rows in left index are more than right index.  Collator will ensure the ordering guarantees while collating SL1 and SL2. Stream’s row will just contain instance_idand docIDs till Join. Aggregator will request region and status_code fields from source reader for given docIDs and shardID.
SortMerge Join

Distributed physical plan

Let’s assume left index has 4 shards (SL1, SL2,  SL3, SL4,) and right index has 2 shards (SR1, SR2).  SL1, SL2, SR1 on node 1 and SL3, SL4, SR2 on node 2.

A possible distributed physical plan by converting previous physical plan

  1. Use Collator to collate SL1, SL2 on node 1 and SL3, SL4 on node 2.
  2. Use Broadcast join to broadcast all rows from SR2 to node 1 and SR1 to node 2. Alternatively, broadcast hash table from SR2 and SR1 to use Hash join if hash tables are under allowed limit. 
  3. Perform Join on node 1 and node 2.
  4. Aggregate results on node 1 and node 2.
  5. Reduce results on the coordinator node, let’s assume its node 1. 
    Distributed HashJoin

We can derive some of the simple rules for query planner to begin with: 

  1. Always Collate all local shards of an index before performing JOIN. Downside could be related concurrency as joining smaller shard will result in more number of overall join operations, which can be executed in parallel.
  2. Always perform JOIN before aggregation. However, it might be possible to do aggregation push down before JOIN for decomposable aggregation functions like min, max, sum, count which can save a lot on bytes transmitted.
  3. Lazily fetch non-join projection fields.
  4. Always perform Aggregation(Internal) on same node on which JOIN was performed. 
  5. Just 1 Aggregation(Reduce) and on the coordinator node like it is done today.
  6. Always perform broadcast join and broadcast smaller table of size less than some threshold.
  7. Always use sort-merge join when index sort is used on the join key.
  8. If document routing is based on a JOIN field, make use of it to retrieve only

More details to follow on following sections and on low level design of above components and other join algorithms. 

Fault tolerance

Resource isolation and circuit Breakers 

Concurrency

Asynchronous execution

Shard allocation algorithm across indexes 

Query caching

Appendix

Join using nested and Join field type

When documents have a parent-child/nested relationship, one can use nested fields or a join field, to index and query the documents from a single index. 
Caveats

  1. Documents must have parent child relationship, so its one-to-many relationship. Many-to-many relationship isn't supported and if that's the case, it has to be flattened to one to many.
  2. Parent child relationship needs to be established at index time and cannot be done at query time, so its less flexible. 
  3. All documents are stored as part of same index, so parent and child documents cannot be scaled independently.
  4. When getting, deleting, updating a document, a routing value needs to be provided as both, parent and child, are supposed to be part of the same shard.

Benefits

  1. Good performance: Since parent and child are co-located in the same shard, so queries are faster as no cross index/shard communication is needed. Also, global ordinals are computed for the join field of parent and child, which optimizes the query even further. 
  2. Aligns with OpenSearch philosophy of a shard capable of handling all queries for documents residing in them. So no explicit changes were required to query interfaces except to support a new query types.

I have put down this proposal after few round of discussions and help from @msfroh @jainankitk @harshavamsi @getsaurabh02

Related component

Search:Query Capabilities

Describe alternatives you've considered

No response

Additional context

No response

@harshavamsi
Copy link
Contributor

Thanks for this RFC @rishabhmaurya, exciting stuff ahead!

Before I provide some comments, I do want to point out that this line points to links that are not accessible to the community.

Address the constraints in existing parent-child join and nested field type.

@msfroh
Copy link
Collaborator

msfroh commented Aug 9, 2024

Integration with existing SQL based libraries for JOIN execution like Datafusion is out of scope (this might be a wrong assumption, but this proposal doesn’t incorporate and compares this alternative, which could be a separate RFC?).

I think a prototype built on Datafusion could be interesting, though I suspect that even building a prototype would take a few months. The main effort AFAIK, would be making OpenSearch data nodes write responses in Arrow format (since AFAIK, that's what Datafusion assumes for the wire protocol).

That said, I also suspect (based on the work with Protobuf here and here) that using Arrow for results would noticeably improve latency, so that part of the prototype could be very useful in the long run and wouldn't be throwaway work.

@msfroh
Copy link
Collaborator

msfroh commented Aug 9, 2024

With regard to query planning, in the long run, we'll need to think about cost/cardinality estimation.

With predicates pushed down to data nodes, we could get cost estimates from Lucene by summing per-segment cost estimates for the per-shard query.

I have a half-baked idea for improving some of those estimates that I need to validate before bringing them to the Lucene community. (The basic idea is that Lucene estimates cost(A AND B) = min(cost(A), cost(B)). If we chunk the doc ID space and compute doc frequencies for each chunk, a better estimate is cost(A AND B) = \sum_{chunk in chunks} min(cost(A, chunk), cost(B, chunk))). I think with BPReordering (that AFAIK helps cluster things into "highly correlated" and "highly uncorrelated" chunks), a chunk-based cost estimate could be much more accurate.

@msfroh
Copy link
Collaborator

msfroh commented Aug 9, 2024

It assumes the rows are ordered by join key in both left and right index. This is a memory efficient algorithm as nothing needs to be kept in-memory in Join executor.

I have another half-baked idea that I need to validate, where I think we could add a "secondary sort" to a Lucene index. The challenge is evaluating whatever (non-join) match predicate we want to push down to the shards such that results are matched/returned sorted by the join key.

Roughly, we compute doc IDs sorted by an another field and keep a map from these "fake" doc IDs back to the real doc IDs. To be able to run queries (with forward-only DocIdSetIterators), we would need to write an extra set of postings using the new ordering (but could hopefully reuse the term dictionary), and maybe write an extra copy of the doc values index (.dvi), but could reuse the doc values data (.dvd). Then I think you could do a sort-merge join over this secondary sort (but return tuples from _source lookups on the original doc IDs). This assumes that the extra postings (especially if we ignore frequencies and positions) and doc value indices (ie. doc->ordinals) are a lot smaller than the other structures (like the _source, term dictionary, doc value dictionary (ie. ordinal -> value)).

In database terms, I think it would be similar to adding an index on a foreign key.

Note that we can efficiently do a page-by-page (not streaming) merge-sort between two indices when the join field is numeric and we have a BKD tree, even without an index sort, because we can already retrieve chunks of docs over join key ranges.

@msfroh
Copy link
Collaborator

msfroh commented Aug 9, 2024

Another possible "off-the-shelf" component that we should definitely consider is the existing OpenSearch SQL plugin.

While it was designed with SQL as the front-end language, it has most of the elements of a sophisticated query engine.

@smacrakis
Copy link

The SQL plugin has very uninformative error messages, which is a problem if it's going to be used by humans (and not machine-generated code).

@rishabhmaurya
Copy link
Contributor Author

@msfroh

If we chunk the doc ID space and compute doc frequencies for each chunk, a better estimate is cost(A AND B) = \sum_{chunk in chunks} min(cost(A, chunk), cost(B, chunk))).

I didn't quite get the idea behind "chunking doc ID space" and I'm guessing you're referring to just text based fields, as for numeric, we have BKD point estimation which is quite accurate already. We make use of docFreq from TermsEnum to compute the cost for an individual term, so are you proposing something like public abstract int docFreq(chunkID)?

I think with BPReordering (that AFAIK helps cluster things into "highly correlated" and "highly uncorrelated" chunks), a chunk-based cost estimate could be much more accurate.

BPReorderer combines multiple fields into one to compute the gap between 2 docs so again I'm not sure how its going to help. However, it will be a very creative use of reordered doc ID space for better cardinality estimation for boolean term queries if we can find such heuristics.

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 12, 2024

The challenge is evaluating whatever (non-join) match predicate we want to push down to the shards such that results are matched/returned sorted by the join key.

If i'm not wrong, DISI will guarantee docs to be traversed in order, so if they are sorted by join field, it should work doesn't matter what other predicates are?

I like this idea of secondary index with a different sort key. It will add to the index size but might be worth it for some users. On the other hand, we can have materialized view for join table which can be populated in background by some form of index management job. But here we need to take care of deleted and updated documents.

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 12, 2024

Another possible "off-the-shelf" component that we should definitely consider is the existing OpenSearch SQL plugin.

We need to see what we can reuse from here but it would mostly be on the query planner side. Given it accepts SQL as the query language and converts it into IR and then does logical and physical planning, I'm not sure of its utility with OpenSearch query DSL and distributed physical plan we want to create which knows details about shards, their location, fields and cardinality and fetching fields lazily as needed. Also, we have to be careful about conversion of query DSL into other form(s) and its latency impact.
It reminds me of this comment and research paper referenced here. But we are taking it as an action item to evaluate the possibility of using existing query planner for JOINs.

@getsaurabh02 getsaurabh02 moved this to Now (This Quarter) in Performance Roadmap Aug 12, 2024
@getsaurabh02 getsaurabh02 moved this from Now (This Quarter) to In Progress in Performance Roadmap Aug 12, 2024
@rishabhmaurya
Copy link
Contributor Author

Meeting notes
Attendees: @jainankitk @msfroh @penghuo @rishabhmaurya @YANG-DB

  1. Discussed the feasibility of using Apache Calcite for query planning and using its execution plan in OpenSearch. There is a adapter for elasticsearch, which could be a good starting point of integration.
  2. Discussed the possibility of using Datafusion for both query planning and execution for JOIN queries.
  3. Discussed the type of use cases users have - 1) well defined fields and queries on which JOIN operation is to be performed - mostly observability and sec analytics users. 2) Ad hoc JOIN queries across indexes.
  4. For 1, we can build projection of join table at index time or in background to start with.
  5. Discussed the aggregation and ordering in JOIN queries .

Next steps:

  1. Check the feasibility of integration of Apache Calcite and also Datafusion and possibly come up with some estimation on effort and overall impact in terms of performance and our long term vision.
  2. For MVP, can we just consider building projection of join table at index time and providing query DSL support for JOIN on top of it to start with?

@mch2 mch2 removed the untriaged label Aug 14, 2024
@getsaurabh02 getsaurabh02 added the Roadmap:Search Project-wide roadmap label label Aug 19, 2024
@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 21, 2024

We had few sync ups on it and have decided to proceed with Milestone 1 below, which is bare bones implementation of JOIN.

Milestone 2 and onward, we will be working toward composable query engine and are willing to invest in integrating Apache Arrow and Flight in OpenSearch and start using them for JOIN implementation. We will start with POC for it and on the QueryPlanner side, we will start with a POC to integrate DataFusion both for planning and JOIN execution.

Feel free to comment for any feedback. If you feel positive and excited about this initiative, feel free to post ideas and contribute.


Tech stack

Rows format: Apache arrow
Stream: Apache flight
Query planning: Datafusion
Join execution: Datafusion
Aggregation: Both OpenSearch and Data fusion

Milestone 1: Bare bones JOIN

  • Support JOIN query using OpenSearch query DSL.

  • No Query planning, execute the input query provided - 

  • broadcast smaller table rows to all nodes where shard of larger tables are present. 

  • Always Collate all local shards of an index before performing JOIN. Downside could be related concurrency as joining smaller shard will result in more number of overall join operations, which can be executed in parallel.

  • Always perform JOIN before aggregation. However, it might be possible to do aggregation push down before JOIN for decomposable aggregation functions like min, max, sum, count which can save a lot on bytes transmitted. 

  • Lazily fetch non-join projection fields.

  • Always perform Aggregation(Internal) on same node on which JOIN was performed. 

  • Just 1 Aggregation(Reduce) and on the coordinator node like it is done today.

  • Always perform broadcast join and broadcast smaller table of size less than some threshold.

  • Always use sort-merge join when index sort is used on the join key.

  • If document routing is based on a JOIN field, make use of it to retrieve only. 

  • Use transport action based batch API to stream rows obtained from executing lucene queries on shards. We probably don’t need a batch API in initial version as we will be supporting join on only smaller right index which can be fetched in one batch.

  • SourceReader implementation to execute left and right lucene query and generate rows.

  • Native implementation of JOIN algorithm.

  • Smaller table matching rows should be less than a predefined threshold. This is to ensure the join operations are not very long running in initial milestones when query planner isn’t available.

  • Supported Join field type and comparators: 

Supported Type Comparator
keyword "="
long/float/double "<", ">", "<=", ">=", "="
date "<", ">", "<=", ">=", "="
  • Support only Inner join.
  • No support for aggregation. 
  • Implement Circuit breakers.

Milestone 2: Apache Arrow and Flight integration

  • Use Apache Arrow and parquet based rows

  • Use Apache flight for streams which is based on gRPC streams. 

  • Encryption in transit and security considerations. 

  • Support following aggregations: 

  • Bucket aggregation: term, range aggregation. 

  • Metric aggregation: Some of the decomposable aggregation will be supported on joined rows like minmaxcountsumavg.

Milestone 3: Data fusion integration

Data fusion rust engine will run on every node on a separate process and will communicate using OpenSearch via APIs and Stream. 

  • Use of Data fusion for query planning and JOIN execution. 

Milestone 4: Performance optimization

  • Concurrency within shards and across shards to make JOIN efficient.
  • Resource isolation
  • Asynchronous execution
  • Query caching

Milestone 5: Index time JOIN projections

@penghuo
Copy link
Contributor

penghuo commented Aug 28, 2024

@rishabhmaurya I understand that the current limitation is that only INNER JOIN is supported, and aggregation is not. I'm unsure whether the effort spent on building INNER JOIN can be leveraged to support more advanced join modes and aggregation. Without building a distributed query engine, do we have a clear path forward?

Have we considered enhancing SQL JOIN performance as an alternative to adding new JOIN grammar in the DSL? I feel that adding new JOIN grammar is a one-way door decision.

@Bukhtawar
Copy link
Collaborator

Thanks @rishabhmaurya for the proposal. Still, catching up, so could be wrong but I am assuming we aren't changing data formats and this proposal aims at joining the data fetched from multiple source. While this approach should still work, the cost of stitching and aggregating data across these sources might still be high?

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 29, 2024

@penghuo thanks for your feedback.

I understand that the current limitation is that only INNER JOIN is supported, and aggregation is not. I'm unsure whether the effort spent on building INNER JOIN can be leveraged to support more advanced join modes and aggregation. Without building a distributed query engine, do we have a clear path forward?

Its just the breakdown of milestone and not limitation in the approach we are taking. Milestone 2 onwards we should be able to support other types of JOINs and aggregations as well. The idea is to create building blocks to make join execution efficient.

Like using Apache arrow vectors as streaming format and using flight streams for RPC will enable performing batched execution of record and vectorized execution due to parquet like in-memory columnar format used in Arrow vectors.

Treating opensearch index as a distributed table with shard as a fragment. Joining shard by shard and on the node where the larger index shard is present and streaming results returned after querying the smaller index shard on a different node.

Lucene result collection strategy described in SourceReader component will be efficient in collecting documents, JOIN and projection fields from lucene query results to Arrow vectors.

Making use of ordering guarantees of keyword and numeric fields in lucene within a segment and producing ordered stream will also contribute toward performing JOINs on high cardinality fields minimizing memory overhead.

Have we considered enhancing SQL JOIN performance as an alternative to adding new JOIN grammar in the DSL? I feel that adding new JOIN grammar is a one-way door decision.

None of these capabilities for efficient execution are available in SQL plugin today. Please correct me if I'm wrong but SQL plugin today excels at converting SQL query into OpenSearch scroll based search request treating OpenSearch cluster as a client.

Yes, its a one way door decision. Do you see any concerns with making JOIN operation as first class citizen of OpenSearch?

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 29, 2024

@Bukhtawar thanks for taking a look.

Still, catching up, so could be wrong but I am assuming we aren't changing data formats and this proposal aims at joining the data fetched from multiple source. While this approach should still work, the cost of stitching and aggregating data across these sources might still be high?

We aren't changing format of data persisted in lucene as of now. We are converting the results retrieved on executing lucene query into Arrow vector format for efficient in-memory analytics required for JOINs.
Theoretically, collation and aggregation will be much more efficient on Arrow vectors due to columnar format and chunking of record enabling batched execution.

I'm currently working on a prototype of converting Lucene query results into Arrow tables and flight stream, so more details to follow. Happy to address early questions and feedback you may have. If you would like to see any comparison as part of this prototype, we can try to incorporate them.

@penghuo
Copy link
Contributor

penghuo commented Sep 6, 2024

@LantaoJin is working on support JOIN in SQL Plugin. Can SQL Plugin leverage the JOIN framework also?

@rishabhmaurya
Copy link
Contributor Author

Yes, initial JOIN support maybe not be as exhaustive in terms of featureset which SQL plugin supports but the plan is to incrementally support them.

@LantaoJin
Copy link
Member

LantaoJin commented Sep 9, 2024

Thanks @penghuo for looping me in. I am working on support native Join in PPL (from another point of view, support Join SQL query in v2 engine of SQL plugin). I have the same concern that how will this Join framework benefit SQL plugin in future. From the milestones list, the core goal is delivering a Join support with DSL interface. Are we target to deliver a full functional Join in DSL or limited one? I would like to see some milestones about delivering a high performance framework itself and a solid Java API which can be leveraged by SQL plugin since most requests of Join from customers are from SQL and PPL.

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Sep 9, 2024

@LantaoJin SQL, PPL and OpenSearch QueryDSL are just language frontends. The core part of milestone 1 is to implement the efficient "Source Reader", "Stream" implementation and a basic JOIN algorithm making use of shard as fragment of a distributed table. This will enable SQL, PPL and Query DSL to make use of these Java APIs.
It is a bit unclear from milestone 1 above but we have parked integration with Query DSL as the last part of the milestone 1 and we think that's the lowest hanging fruit out of all components.

In upcoming weeks, the java APIs will be well defined and a stream based implementation will be available from feature branch. That's when we can collaborate on how to get this framework integrated with SQL and PPL.

@msfroh is also thinking about introducing a OpenSearch AST, which SQL, PPL and QueryDSL can directly use but its specification is not yet defined and milestone 1 will be independent of it.

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Sep 9, 2024

@LantaoJin I was thinking if you are planning to implement broadcast join in the plugin, maybe we can port that logic (just execution and not planner)into core once integrated with this framework.

@YANG-DB
Copy link
Member

YANG-DB commented Sep 9, 2024

@LantaoJin @rishabhmaurya I really like this approach - IMO the current (join 1st step phase) initial join will just use the existing DSL - once the API will continue the native join directive - this would be pushed down to the engine itself.
Another idea is to add some engine query planning strategy for joins - based on the expected cardinality - are we planning this as well ?

@rishabhmaurya
Copy link
Contributor Author

Thanks @YANG-DB for your feedback.

yes, @jainankitk is working on query planner and he can provide details. He was exploring using one of the existing reputed libraries for query planner but was running into challenges on treating shard as a table for cardinality estimation and distributed plan around it.

@LantaoJin
Copy link
Member

In upcoming weeks, the java APIs will be well defined and a stream based implementation will be available from feature branch. That's when we can collaborate on how to get this framework integrated with SQL and PPL.

Sounds good!

@LantaoJin I was thinking if you are planning to implement broadcast join in the plugin, maybe we can port that logic (just execution and not planner)into core once integrated with this framework.

No. the Join in plugin is still coordinating-node-join. So I am looking forward to the java APIs above.

@getsaurabh02 getsaurabh02 added the v2.18.0 Issues and PRs related to version 2.18.0 label Oct 7, 2024
@getsaurabh02 getsaurabh02 added the v2.19.0 Issues and PRs related to version 2.19.0 label Oct 21, 2024
@getsaurabh02 getsaurabh02 moved this from In Progress to Todo in Performance Roadmap Dec 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Roadmap:Search Project-wide roadmap label Search:Query Capabilities v2.18.0 Issues and PRs related to version 2.18.0 v2.19.0 Issues and PRs related to version 2.19.0
Projects
Status: New
Status: Todo
Status: 🆕 New
Development

No branches or pull requests

10 participants