Skip to content

Commit

Permalink
Reduce json memory (#2214)
Browse files Browse the repository at this point in the history
* Use ijson to make json more compact in memory

* Update to [email protected].

This allows for a custom bincode encoding for the `Field` type, which in
turn allows to encode Field::Json as msgpack. This is required because
bincode is not self-describing, which json requires.

* Temporarily silence clippy large_error_variant warning
  • Loading branch information
Jesse-Bakker authored Nov 8, 2023
1 parent f56204d commit a12f3d0
Show file tree
Hide file tree
Showing 72 changed files with 1,140 additions and 1,088 deletions.
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

[profile.test]
lto = "off"
84 changes: 76 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ members = [
]
resolver = "2"

[workspace.dependencies]
bincode = { version = "2.0.0-rc.3", features = ["derive"]}

[patch.crates-io]
postgres = { git = "https://github.com/getdozer/rust-postgres" }
postgres-protocol = { git = "https://github.com/getdozer/rust-postgres" }
postgres-types = { git = "https://github.com/getdozer/rust-postgres" }
tokio-postgres = { git = "https://github.com/getdozer/rust-postgres" }


2 changes: 1 addition & 1 deletion dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn serialize_log_response(response: LogResponseFuture) -> Result<LogRespon
let response = response
.await
.map_err(|e| Status::new(tonic::Code::Internal, e.to_string()))?;
let data = bincode::serialize(&response).map_err(|e| {
let data = bincode::encode_to_vec(&response, bincode::config::legacy()).map_err(|e| {
Status::new(
tonic::Code::Internal,
format!("Failed to serialize response: {}", e),
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn record_to_map(
let mut map = IndexMap::new();

for (field_def, field) in schema.fields.iter().zip(record.record.values) {
let val = field_to_json_value(field)?;
let val = field_to_json_value(field);
map.insert(field_def.name.clone(), val);
}

Expand Down
1 change: 1 addition & 0 deletions dozer-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ahash = "0.8.3"
metrics = "0.21.0"
clap = { version = "4.4.1", features = ["derive"] }
env_logger = "0.10.0"
bincode = { workspace = true }

[dev-dependencies]
criterion = "0.4"
Expand Down
5 changes: 3 additions & 2 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ pub trait MainEnvironment: LmdbEnvironment {
.commit_state
.load(txn)?
.map(|commit_state| {
bincode::deserialize(commit_state.borrow())
bincode::decode_from_slice(commit_state.borrow(), bincode::config::legacy())
.map(|v| v.0)
.map_err(CacheError::map_deserialization_error)
})
.transpose()
Expand Down Expand Up @@ -416,7 +417,7 @@ impl RwMainEnvironment {
let txn = self.env.txn_mut()?;
self.common.commit_state.store(
txn,
bincode::serialize(state)
bincode::encode_to_vec(state, bincode::config::legacy())
.map_err(CacheError::map_serialization_error)?
.as_slice(),
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,25 @@ impl BorrowEncode for Operation {

impl<'a> Encode<'a> for OperationBorrow<'a> {
fn encode(self) -> Result<Encoded<'a>, StorageError> {
dozer_types::bincode::serialize(&self)
.map(Encoded::Vec)
.map_err(|e| StorageError::SerializationError {
let encoded = bincode::encode_to_vec(self, bincode::config::legacy()).map_err(|e| {
StorageError::SerializationError {
typ: "Operation",
reason: Box::new(e),
})
}
})?;
Ok(Encoded::Vec(encoded))
}
}

impl Decode for Operation {
fn decode(bytes: &[u8]) -> Result<Cow<Self>, StorageError> {
dozer_types::bincode::deserialize(bytes)
.map(Cow::Owned)
let decoded = dozer_types::bincode::decode_from_slice(bytes, bincode::config::legacy())
.map_err(|e| StorageError::DeserializationError {
typ: "Operation",
reason: Box::new(e),
})
})?
.0;
Ok(Cow::Owned(decoded))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,33 @@ use dozer_tracing::Labels;
use dozer_types::{
borrow::{Borrow, Cow, IntoOwned},
log::info,
serde::{Deserialize, Serialize},
types::Record,
};
use metrics::{describe_counter, increment_counter};

use crate::cache::{CacheRecord, RecordMeta};

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(crate = "dozer_types::serde")]
#[derive(Debug, Clone, PartialEq, bincode::Decode)]
pub enum Operation {
Delete {
/// The operation id of an `Insert` operation, which must exist.
operation_id: u64,
},
Insert {
#[bincode(with_serde)]
record_meta: RecordMeta,
record: Record,
},
}

#[derive(Debug, Clone, Copy, Serialize)]
#[serde(crate = "dozer_types::serde")]
#[derive(Debug, Clone, Copy, bincode::Encode)]
pub enum OperationBorrow<'a> {
Delete {
/// The operation id of an `Insert` operation, which must exist.
operation_id: u64,
},
Insert {
#[bincode(with_serde)]
record_meta: RecordMeta,
record: &'a Record,
},
Expand Down
Loading

0 comments on commit a12f3d0

Please sign in to comment.