diff --git a/Cargo.lock b/Cargo.lock index 4bad2e3..7432cbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2768,6 +2768,7 @@ dependencies = [ "dipstick", "dynamodb_lock", "env_logger", + "flate2", "futures", "jmespatch", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 9f34759..4a9985f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ authors = ["R. Tyler Croy ", "Christian Williams deserializer, - Err(e) => return Err(IngestError::UnableToCreateDeserializer { source: e }), - }; + let deserializer = + match MessageDeserializerFactory::try_build(&opts.input_format, opts.decompress_gzip) { + Ok(deserializer) => deserializer, + Err(e) => return Err(IngestError::UnableToCreateDeserializer { source: e }), + }; + Ok(IngestProcessor { topic, consumer, diff --git a/src/main.rs b/src/main.rs index 56a2c50..3420f28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -141,6 +141,8 @@ async fn main() -> anyhow::Result<()> { let end_at_last_offsets = ingest_matches.get_flag("end"); + let decompress_gzip = ingest_matches.get_flag("decompress_gzip"); + // ingest_matches.get_flag("end") let format = convert_matches_to_message_format(ingest_matches).unwrap(); @@ -161,6 +163,7 @@ async fn main() -> anyhow::Result<()> { statsd_endpoint, input_format: format, end_at_last_offsets, + decompress_gzip, }; tokio::spawn(async move { @@ -452,6 +455,11 @@ This can be used to provide TLS configuration as in: .num_args(0) .action(ArgAction::SetTrue) .help("")) + .arg(Arg::new("decompress_gzip") + .long("decompress_gzip") + .env("DECOMPRESS_GZIP") + .help("Enable gzip decompression for incoming messages") + .action(ArgAction::SetTrue)) ) .arg_required_else_help(true) } diff --git a/src/serialization.rs b/src/serialization.rs index 9dfc68b..6068203 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -1,12 +1,11 @@ -use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, path::PathBuf}; - +use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; use async_trait::async_trait; +use flate2::read::GzDecoder; use schema_registry_converter::async_impl::{ easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings, }; use serde_json::Value; - -use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; +use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, io::Read, path::PathBuf}; #[async_trait] pub(crate) trait MessageDeserializer { @@ -17,20 +16,22 @@ pub(crate) trait MessageDeserializer { } pub(crate) struct MessageDeserializerFactory {} + impl MessageDeserializerFactory { pub fn try_build( input_format: &MessageFormat, + decompress_gzip: bool, // Add this parameter ) -> Result, anyhow::Error> { match input_format { MessageFormat::Json(data) => match data { - crate::SchemaSource::None => Ok(Self::json_default()), + crate::SchemaSource::None => Ok(Self::json_default(decompress_gzip)), crate::SchemaSource::SchemaRegistry(sr) => { match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) { Ok(s) => Ok(Box::new(s)), Err(e) => Err(e), } } - crate::SchemaSource::File(_) => Ok(Self::json_default()), + crate::SchemaSource::File(_) => Ok(Self::json_default(decompress_gzip)), }, MessageFormat::Avro(data) => match data { crate::SchemaSource::None => Ok(Box::::default()), @@ -47,12 +48,12 @@ impl MessageDeserializerFactory { } } }, - _ => Ok(Box::new(DefaultDeserializer {})), + _ => Ok(Box::new(DefaultDeserializer::new(decompress_gzip))), } } - fn json_default() -> Box { - Box::new(DefaultDeserializer {}) + fn json_default(decompress_gzip: bool) -> Box { + Box::new(DefaultDeserializer::new(decompress_gzip)) } fn build_sr_settings(registry_url: &url::Url) -> Result { @@ -80,16 +81,41 @@ impl MessageDeserializerFactory { } } -struct DefaultDeserializer {} +struct DefaultDeserializer { + decompress_gzip: bool, +} + +impl DefaultDeserializer { + pub fn new(decompress_gzip: bool) -> Self { + DefaultDeserializer { decompress_gzip } + } + + fn decompress(bytes: &[u8]) -> std::io::Result> { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed_data = Vec::new(); + decoder.read_to_end(&mut decompressed_data)?; + Ok(decompressed_data) + } +} #[async_trait] impl MessageDeserializer for DefaultDeserializer { async fn deserialize(&mut self, payload: &[u8]) -> Result { - let value: Value = match serde_json::from_slice(payload) { + let payload = if self.decompress_gzip { + Self::decompress(payload).map_err(|e| { + MessageDeserializationError::JsonDeserialization { + dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()), + } + })? + } else { + payload.to_vec() + }; + + let value: Value = match serde_json::from_slice(&payload) { Ok(v) => v, Err(e) => { return Err(MessageDeserializationError::JsonDeserialization { - dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()), + dead_letter: DeadLetter::from_failed_deserialization(&payload, e.to_string()), }); } };