Avoid record duplication in JOIN processor #1771
Closed
chubei
started this conversation in
Feature Requests
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Avoid record duplication in JOIN processor
Problem
The JOIN processor has to keep track of all its history inputs, so when a new record arrives, it can produce the cartesian product of the new record with the history, and filter based on the join condition.
Currently we're storing the history record as
Vec<Field>
. Imagine the case where a JOIN processor (call it JOIN2) uses the output of a previous JOIN processor (call it JOIN1) as a input. JOIN2 will have to store all the records that JOIN1 produced. However, JOIN1 also has all the records in its own history. Effectively, memory usage is doubled.This goes worse when there're more cascaded JOIN processors.
Solution
We can store record as
Vec<RefOrField>
instead ofVec<Field>
.RefOrField
is an enum that can be either a reference to a record, or a direct field.One may wonder how a
RefOrField
can be more memory efficient than aField
because most of the fields are small. The point here is thatRefOrField
references a record, so it can reference many fields at once.The information of which fields are actually referenced in a
RefOrField
is stored in the correspondingRefSchema
, a referencing version ofSchema
. WhileSchema
storesVec<FieldDefinition>
,RefSchema
storesVec<RefOrFieldDefinition>
.RefOrFieldDefinition
is an enum that can be either a reference to a schema and some of the referee's field definitions, or a direct field definition.Note that we only allow one level of reference, meaning that all the fields referenced in a
RefOrFieldDefinition::Ref
must be direct field definitions.Code
Field indexing
To index into such
RefSchema
s andRefRecord
s, there has to be two levels of indexing. The first level is to index into theVec<RefOrFieldDefinition>
orVec<RefOrField>
, and the second level is to index into the direct fields of the referencedRefSchema
orRefRecord
.Dereferencing through cloning
RefSchema
/RefRecord
are not suitable for serialization because if we directly deriveSerialize
, all the referencedRefSchema
/RefRecord
will be serialized recursively. For ease of implementation, instead of implementing a custom serializer, we can "dereference" theRefSchema
/RefRecord
to aSchema
/Record
, cloning all the referencedFieldDefinition/Field
in the process.Use in JOIN
In JOIN processor, we produce the output schema/record by referencing the left and right input schemas/records. Here we define two helper functions to do that.
Note that in above functions, columns are reordered so we don't keep duplicate references to the same
RefSchema
/RefRecord
.Now the JOIN processor can implement joining using referencing.
@mediuminvader @v3g42 @snork-alt
Beta Was this translation helpful? Give feedback.
All reactions