Skip to content

Commit

Permalink
Make quantity field human readable (#891)
Browse files Browse the repository at this point in the history
Add support for human readable quantity field in the different json config, i.e, 10KB -> 10000.
This implementation still allows for the field being pure unsigned integers, so, it can be seen as offering an alternative configuration.

Co-authored-by: Marcus Eagan <[email protected]>
  • Loading branch information
matdexir and MarcusSorealheis authored Jun 3, 2024
1 parent 77b2c33 commit da2c4a7
Show file tree
Hide file tree
Showing 9 changed files with 456 additions and 23 deletions.
262 changes: 260 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions nativelink-config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load(
"rust_doc",
"rust_doc_test",
"rust_library",
"rust_test_suite",
)

rust_library(
Expand All @@ -16,11 +17,29 @@ rust_library(
],
visibility = ["//visibility:public"],
deps = [
"@crates//:byte-unit",
"@crates//:humantime",
"@crates//:serde",
"@crates//:shellexpand",
],
)

rust_test_suite(
name = "integration",
timeout = "short",
srcs = [
"tests/deserialization_test.rs",
],
deps = [
"//nativelink-config",
"//nativelink-error",
"@crates//:byte-unit",
"@crates//:humantime",
"@crates//:serde",
"@crates//:serde_json5",
],
)

rust_doc(
name = "docs",
crate = ":nativelink-config",
Expand Down
5 changes: 4 additions & 1 deletion nativelink-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ version = "0.4.0"
edition = "2021"

[dependencies]
serde = { version = "1.0.201", features = ["derive"] }
byte-unit = "5.1.4"
humantime = "2.1.0"
serde = { version = "1.0.198", features = ["derive"] }
serde_json5 = "0.1.0"
shellexpand = "3.1.0"
11 changes: 6 additions & 5 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde::Deserialize;

use crate::schedulers::SchedulerConfig;
use crate::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand,
convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
convert_vec_string_with_shellexpand,
Expand Down Expand Up @@ -139,7 +140,7 @@ pub struct ByteStreamConfig {
/// 16KiB - 64KiB is optimal.
///
/// Defaults: 64KiB
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_bytes_per_stream: usize,

/// In the event a client disconnects while uploading a blob, we will hold
Expand All @@ -148,7 +149,7 @@ pub struct ByteStreamConfig {
/// the same blob.
///
/// Defaults: 10 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub persist_stream_on_disconnect_timeout: usize,
}

Expand Down Expand Up @@ -557,7 +558,7 @@ pub struct LocalWorkerConfig {
/// longer than this time limit, the task will be rejected. Value in seconds.
///
/// Default: 1200 (seconds / 20 mins)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub max_action_timeout: usize,

/// If timeout is handled in `entrypoint` or another wrapper script.
Expand Down Expand Up @@ -667,7 +668,7 @@ pub struct GlobalConfig {
/// a new file descriptor because the limit has been reached.
///
/// Default: 1000 (1 second)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub idle_file_descriptor_timeout_millis: u64,

/// This flag can be used to prevent metrics from being collected at runtime.
Expand Down Expand Up @@ -695,7 +696,7 @@ pub struct GlobalConfig {
/// digest.
///
/// Default: 1024*1024 (1MiB)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub default_digest_size_health_check: usize,
}

Expand Down
2 changes: 1 addition & 1 deletion nativelink-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

pub mod cas_server;
pub mod schedulers;
mod serde_utils;
pub mod serde_utils;
pub mod stores;
6 changes: 3 additions & 3 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;

use serde::Deserialize;

use crate::serde_utils::convert_numeric_with_shellexpand;
use crate::serde_utils::{convert_duration_with_shellexpand, convert_numeric_with_shellexpand};
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};

#[allow(non_camel_case_types)]
Expand Down Expand Up @@ -97,13 +97,13 @@ pub struct SimpleScheduler {
/// The amount of time to retain completed actions in memory for in case
/// a WaitExecution is called after the action has completed.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub retain_completed_for_s: u64,

/// Remove workers from pool once the worker has not responded in this
/// amount of time in seconds.
/// Default: 5 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub worker_timeout_s: u64,

/// If a job returns an internal error or times out this many times when
Expand Down
80 changes: 80 additions & 0 deletions nativelink-config/src/serde_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::fmt;
use std::marker::PhantomData;
use std::str::FromStr;

use byte_unit::Byte;
use humantime::parse_duration;
use serde::{de, Deserialize, Deserializer};

/// Helper for serde macro so you can use shellexpand variables in the json configuration
Expand Down Expand Up @@ -138,3 +140,81 @@ pub fn convert_optional_string_with_shellexpand<'de, D: Deserializer<'de>>(
Ok(None)
}
}

pub fn convert_data_size_with_shellexpand<'de, D, T, E>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
E: fmt::Display,
T: TryFrom<i64> + FromStr<Err = E>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
// define a visitor that deserializes
// `ActualData` encoded as json within a string
struct USizeVisitor<T: TryFrom<i64>>(PhantomData<T>);

impl<'de, T, FromStrErr> de::Visitor<'de> for USizeVisitor<T>
where
FromStrErr: fmt::Display,
T: TryFrom<i64> + FromStr<Err = FromStrErr>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
type Value = T;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string containing json data")
}

fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
v.try_into().map_err(de::Error::custom)
}

fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
let expanded = (*shellexpand::env(v).map_err(de::Error::custom)?).to_string();
let byte_size = Byte::parse_str(expanded, true).map_err(de::Error::custom)?;
let byte_size_u128 = byte_size.as_u128();
T::try_from(byte_size_u128.try_into().map_err(de::Error::custom)?)
.map_err(de::Error::custom)
}
}

deserializer.deserialize_any(USizeVisitor::<T>(PhantomData::<T> {}))
}

pub fn convert_duration_with_shellexpand<'de, D, T, E>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
E: fmt::Display,
T: TryFrom<i64> + FromStr<Err = E>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
// define a visitor that deserializes
// `ActualData` encoded as json within a string
struct USizeVisitor<T: TryFrom<i64>>(PhantomData<T>);

impl<'de, T, FromStrErr> de::Visitor<'de> for USizeVisitor<T>
where
FromStrErr: fmt::Display,
T: TryFrom<i64> + FromStr<Err = FromStrErr>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
type Value = T;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string containing json data")
}

fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
v.try_into().map_err(de::Error::custom)
}

fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
let expanded = (*shellexpand::env(v).map_err(de::Error::custom)?).to_string();
let duration = parse_duration(&expanded).map_err(de::Error::custom)?;
let duration_secs = duration.as_secs();
T::try_from(duration_secs.try_into().map_err(de::Error::custom)?)
.map_err(de::Error::custom)
}
}

deserializer.deserialize_any(USizeVisitor::<T>(PhantomData::<T> {}))
}
23 changes: 12 additions & 11 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use serde::{Deserialize, Serialize};

use crate::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_string_with_shellexpand,
convert_string_with_shellexpand, convert_vec_string_with_shellexpand,
};
Expand Down Expand Up @@ -203,7 +204,7 @@ pub struct ShardStore {
#[serde(deny_unknown_fields)]
pub struct SizePartitioningStore {
/// Size to partition the data on.
#[serde(deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(deserialize_with = "convert_data_size_with_shellexpand")]
pub size: u64,

/// Store to send data when object is < (less than) size.
Expand Down Expand Up @@ -243,7 +244,7 @@ pub struct FilesystemStore {
/// Buffer size to use when reading files. Generally this should be left
/// to the default value except for testing.
/// Default: 32k.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub read_buffer_size: u32,

/// Policy used to evict items out of the store. Failure to set this
Expand All @@ -255,7 +256,7 @@ pub struct FilesystemStore {
/// value is used to determine an entry's actual size on disk consumed
/// For a 4KB block size filesystem, a 1B file actually consumes 4KB
/// Default: 4096
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub block_size: u64,
}

Expand Down Expand Up @@ -297,7 +298,7 @@ pub struct DedupStore {
/// deciding where to partition the data.
///
/// Default: 65536 (64k)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub min_size: u32,

/// A best-effort attempt will be made to keep the average size
Expand All @@ -311,13 +312,13 @@ pub struct DedupStore {
/// details.
///
/// Default: 262144 (256k)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub normal_size: u32,

/// Maximum size a chunk is allowed to be.
///
/// Default: 524288 (512k)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_size: u32,

/// Due to implementation detail, we want to prefer to download
Expand Down Expand Up @@ -396,7 +397,7 @@ pub struct Lz4Config {
/// compression ratios.
///
/// Default: 65536 (64k).
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub block_size: u32,

/// Maximum size allowed to attempt to deserialize data into.
Expand All @@ -407,7 +408,7 @@ pub struct Lz4Config {
/// allow you to specify the maximum that we'll attempt deserialize.
///
/// Default: value in `block_size`.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_decode_block_size: u32,
}

Expand Down Expand Up @@ -447,19 +448,19 @@ pub struct CompressionStore {
pub struct EvictionPolicy {
/// Maximum number of bytes before eviction takes place.
/// Default: 0. Zero means never evict based on size.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_bytes: usize,

/// When eviction starts based on hitting max_bytes, continue until
/// max_bytes - evict_bytes is met to create a low watermark. This stops
/// operations from thrashing when the store is close to the limit.
/// Default: 0
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub evict_bytes: usize,

/// Maximum number of seconds for an entry to live before an eviction.
/// Default: 0. Zero means never evict based on time.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub max_seconds: u32,

/// Maximum size of the store before an eviction takes place.
Expand Down
71 changes: 71 additions & 0 deletions nativelink-config/tests/deserialization_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use nativelink_config::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
};
use serde::Deserialize;

#[derive(Deserialize)]
struct DurationEntity {
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
duration: usize,
}

#[derive(Deserialize)]
struct DataSizeEntity {
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
data_size: usize,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_duration_human_readable_deserialize() {
let example = r#"
{"duration": "1m 10s"}
"#;
let deserialized: DurationEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.duration, 70);
}

#[test]
fn test_duration_usize_deserialize() {
let example = r#"
{"duration": 10}
"#;
let deserialized: DurationEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.duration, 10);
}

#[test]
fn test_data_size_unit_deserialize() {
let example = r#"
{"data_size": "1KiB"}
"#;
let deserialized: DataSizeEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.data_size, 1024);
}

#[test]
fn test_data_size_usize_deserialize() {
let example = r#"
{"data_size": 10}
"#;
let deserialized: DataSizeEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.data_size, 10);
}
}

0 comments on commit da2c4a7

Please sign in to comment.