Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to support gzip compressed messages #179

Merged
merged 3 commits into from
Jul 7, 2024
Merged

Conversation

geosach
Copy link
Contributor

@geosach geosach commented Jul 7, 2024

Gzip decompression flag

Summary

This pull request introduces support for ingesting gzip-compressed messages in the kafka-delta-ingest project. This new functionality allows users to handle gzip-compressed Kafka messages, decompressing them before further processing and ingestion into Delta Lake.

Changes

  1. New Feature: Gzip Decompression
    • Added the --decompress_gzip flag to the command-line options.
    • Modified the MessageDeserializerFactory and related deserialization logic to support gzip decompression.
    • Updated main.rs to handle the new command-line argument and pass it to the deserialization process.
    • Updated lib.rs to integrate the new gzip decompression functionality.

Notes

  • I tested the new functionality with both gzip-compressed and uncompressed messages to ensure compatibility and correct behavior in both scenarios.
  • While I have tested the feature manually, I did not write automated tests for this new functionality. If this change is accepted and additional tests are required, I would be happy to help implement them with some guidance on the specific test cases and scenarios that need to be covered.
  • I noticed that sometimes the github action tests fail (in older commits as well) with the following error:
---- test_start_from_latest stdout ----
thread 'test_start_from_latest' panicked at tests/helpers/mod.rs:361:13:
File was not created before timeout
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    test_start_from_latest

test result: FAILED. 4 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 203.42s

However since this happens to older commits, it seems to not be caused by this new functionality.

Usage

Use the --decompress_gzip flag to enable gzip decompression. Ensure that the Kafka producer is configured to produce gzip-compressed messages.

RUST_LOG=debug cargo run ingest <kafka_topic> <delta_table_path> \
  --allowed_latency 60 \
  --app_id <app_id> \
  --decompress_gzip \
  --auto_offset_reset earliest

This command enables gzip decompression for Kafka messages ingested into the Delta Lake table.

Conclusion

This enhancement allows for more flexible and efficient processing of Kafka messages, particularly in environments where data compression is essential. Please review the changes and let me know if any adjustments or additional tests are required. Thank you for considering this contribution.

@rtyler rtyler merged commit 0585509 into delta-io:main Jul 7, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants