Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce schema evolution via the -S flag #164

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

rtyler
Copy link
Member

@rtyler rtyler commented Jan 7, 2024

This set of changes implements more nuanced handling of Delta vs. message schema with the introduction of the -S command line flag which enables schema evolutions.

In order for schema evolution to work, there is a necessary performance hit. kafka-delta-ingest must determine the schema of every message that is read from Kafka, infer its schema, and if necessary add nullable columns to the Delta table. This is related to similar work in delta-rs for the RecordBatchWriter but deviates because of the mechanism by which RecordBatches and schema is handled in kafka-delta-ingest.

Sponsored-by: Raft LLC

NOTE: This pull request builds on #162

@rtyler rtyler force-pushed the schema-evolution-from-outside-in branch 3 times, most recently from e4a1ade to c1bcd37 Compare January 8, 2024 05:51
@rtyler rtyler changed the title Implementing stricter schema conformance and evolution Introduce schema evolution via the -S flag Jan 8, 2024
@rtyler rtyler force-pushed the schema-evolution-from-outside-in branch from c1bcd37 to 820421f Compare January 8, 2024 22:53
I stumbled into this while pilfering code from kafka-delta-ingest for another
project and discovered that the code in `write_values` which does
`record_batch.schema() != arrow_schema` doesn't do what we think it does.

Basically if `Decoder` "works" the schema it's going to return is just the
schema passed into it. It has no bearing on whether the JSON has the same
schema. Don't ask me why.

Using the reader's `infer_json_schema_*` functions can provide a Schema that is
useful for comparison:

        let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned()));
        let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!");
        let decoder = Decoder::new(Arc::new(json_schema), options);
        if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") {
            assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!");
        }

What's even more interesting, is that after a certain number of fields are
removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as
to why.
…l refactor

The intention here is to enable more consistent schema handling within
the writers

Sponsored-by: Raft LLC
…e writers

The DeserializedMessage carries optional inferred schema information
along with the message itself. This is useful for understanding whether
schema evolution hould happen "later" in the message processing
pipeline.

The downside of this behavior is that there will be performance impact
as arrow_json does schema inference.

Sponsored-by: Raft LLC
Turning avro off drops about 50 crates from the default build, so useful
for development, but the code would need to be cleaned up to remove this
from the default features list

See #163
Identified by `cargo +nightly udeps`
This change is a little wrapped up in the introduction of
DeserializedMessage but the trade-off for development targeting S3 is
that I am linking 382 crates every cycle as opposed to 451.

Fixes #163
I don't know why the impl was way down there 😄
This commit introduces some interplay between the IngestProcessor and
DataWriter, the latter of which needs to keep track of whether or not it
has a changed schema.

What should be done with that changed schema must necessarily live in
IngestProcessor since that will perform the Delta transaction commits at
the tail end of batch processing.

There is some potential mismatches between the schema in storage and
what the DataWriter has, so this change tries to run the runloop again
if the current schema and the evolved schema are incompatible

Closes #131

Sponsored-by: Raft LLC
This will ensure the non-evolution case stands relatively speedy!

Sponsored-by: Raft LLC
@rtyler rtyler marked this pull request as ready for review July 8, 2024 17:37
@rtyler rtyler enabled auto-merge July 8, 2024 17:38
@rtyler rtyler requested a review from mightyshazam July 8, 2024 17:38
Copy link
Collaborator

@mightyshazam mightyshazam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transform usage won't work as implemented without a change, but everything else is nice to have.

self.transformer
.transform(&mut v, None as Option<&BorrowedMessage>)?;
// TODO: this can't be right, shouldn't this function takje DeserializedMessage
.transform(&mut v.clone().into(), None as Option<&BorrowedMessage>)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this does what is intended. The transform function takes a mutable reference to the clone of v instead of a mutable reference to v itself. The function modifies and consumes the clone, but it only exists in the scope of the input to transform, so it should be dropped once the method finishes. That means Ok(v) is returning the original instead of the transformed message. Implementing Into<Value> for DeserializedMessage and changing the code to

let mut dm = DeserializedMessage::from(v);
                        self.transformer
                            // TODO: this can't be right, shouldn't this function takje DeserializedMessage
                            .transform(&mut dm, None as Option<&BorrowedMessage>)?;
                        
                        Ok(dm.into())

should do the trick. Also the Into implementation will allow for the removal of all of instances of to_owned() that my last commit added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transform code might be some bungled merge conflict resolution, I'll be able to look more into this shortly

@@ -348,13 +349,13 @@ impl Transformer {
/// The optional `kafka_message` must be provided to include well known Kafka properties in the value.
pub(crate) fn transform<M>(
&self,
value: &mut Value,
value: &mut DeserializedMessage,
kafka_message: Option<&M>,
) -> Result<(), TransformError>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option, with respect to my earlier comment about changing DeserializedMessage, is to have transform return Result<DeserializedMessage, TransformError. Then it can take value: DeserializedMessage. That will allow us to mutate and return what we mutated, but that is a bigger code change.

src/transforms.rs Show resolved Hide resolved
@@ -442,7 +447,7 @@ mod tests {
];

for i in 0..messages.len() {
assert_eq!(messages[i], expected[i]);
assert_eq!(messages[i].message().to_owned(), expected[i]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.into() works here if implemented, saving us a clone

}
}

let values = values.into_iter().map(|v| v.message().to_owned()).collect();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.into() works here if implemented, saving us a clone

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants