Skip to content

Commit

Permalink
Expose fingerprint hash to metrics in redis store (#1347)
Browse files Browse the repository at this point in the history
To allow external services to know what the active fingerprint is to
query redis directly, we expose the fingerprint hash via metrics.
  • Loading branch information
allada authored Oct 1, 2024
1 parent 4b4f047 commit 8a90f09
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 1 deletion.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ rust_test_suite(
"//nativelink-config",
"//nativelink-error",
"//nativelink-metric",
"//nativelink-metric-collector",
"//nativelink-proto",
"//nativelink-util",
"@crates//:async-lock",
Expand All @@ -123,11 +124,13 @@ rust_test_suite(
"@crates//:parking_lot",
"@crates//:pretty_assertions",
"@crates//:rand",
"@crates//:serde_json",
"@crates//:serial_test",
"@crates//:sha2",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:tracing",
"@crates//:tracing-subscriber",
"@crates//:uuid",
],
)
Expand Down
3 changes: 3 additions & 0 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ uuid = { version = "1.8.0", default-features = false, features = ["v4", "serde"]

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
nativelink-metric-collector = { path = "../nativelink-metric-collector" }
pretty_assertions = { version = "1.4.0", features = ["std"] }
memory-stats = "1.2.0"
mock_instant = "0.3.2"
Expand All @@ -77,4 +78,6 @@ aws-smithy-runtime-api = "=1.7.1"
serial_test = { version = "3.1.1", features = [
"async",
], default-features = false }
serde_json = "1.0.128"
fred = { version = "9.1.2", default-features = false, features = ["mocks"] }
tracing-subscriber = { version = "0.3.18", default-features = false }
15 changes: 15 additions & 0 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,33 @@ use crate::redis_utils::ft_aggregate;
pub const READ_CHUNK_SIZE: usize = 64 * 1024;
const CONNECTION_POOL_SIZE: usize = 3;

fn to_hex(value: &u32) -> String {
format!("{value:08x}")
}

/// A [`StoreDriver`] implementation that uses Redis as a backing store.
#[derive(MetricsComponent)]
pub struct RedisStore {
/// The client pool connecting to the backing Redis instance(s).
client_pool: RedisPool,

/// A channel to publish updates to when a key is added, removed, or modified.
#[metric(
help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
)]
pub_sub_channel: Option<String>,

/// A redis client for managing subscriptions.
/// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
subscriber_client: SubscriberClient,

/// For metrics only.
#[metric(
help = "A unique identifier for the FT.CREATE command used to create the index template",
handler = to_hex
)]
fingerprint_create_index: u32,

/// A function used to generate names for temporary keys.
temp_name_generator_fn: fn() -> String,

Expand Down Expand Up @@ -159,6 +173,7 @@ impl RedisStore {
client_pool,
pub_sub_channel,
subscriber_client,
fingerprint_create_index: fingerprint_create_index_template(),
temp_name_generator_fn,
key_prefix,
update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
Expand Down
71 changes: 70 additions & 1 deletion nativelink-store/tests/redis_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@ use fred::prelude::Builder;
use fred::types::{RedisConfig, RedisValue};
use nativelink_error::{Code, Error};
use nativelink_macro::nativelink_test;
use nativelink_metric::{MetricFieldData, MetricKind, MetricsComponent, RootMetricsComponent};
use nativelink_metric_collector::MetricsCollectorLayer;
use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS;
use nativelink_store::redis_store::{RedisStore, READ_CHUNK_SIZE};
use nativelink_store::store_manager::StoreManager;
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::{StoreLike, UploadSizeInfo};
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
use parking_lot::RwLock;
use pretty_assertions::assert_eq;
use serde_json::{from_str, to_string, Value};
use tokio::sync::watch;
use tracing_subscriber::layer::SubscriberExt;

const VALID_HASH1: &str = "3031323334353637383961626364656630303030303030303030303030303030";
const TEMP_UUID: &str = "550e8400-e29b-41d4-a716-446655440000";
Expand Down Expand Up @@ -664,3 +670,66 @@ async fn dont_loop_forever_on_empty() -> Result<(), Error> {

Ok(())
}

#[nativelink_test]
async fn test_redis_fingerprint_metric() -> Result<(), Error> {
let expected_fingerprint_value: String = String::from("3e762c15");

let store_manager = Arc::new(StoreManager::new());

{
let store = {
let mut builder = Builder::default_centralized();
builder.set_config(RedisConfig {
mocks: Some(Arc::new(MockRedisBackend::new()) as Arc<dyn Mocks>),
..Default::default()
});

Store::new(Arc::new(RedisStore::new_from_builder_and_parts(
builder,
None,
mock_uuid_generator,
String::new(),
)?))
};

store_manager.add_store("redis_store", store);
};

let root_metrics = Arc::new(RwLock::new(RootMetricsTest {
stores: store_manager.clone(),
}));

let (layer, output_metrics) = MetricsCollectorLayer::new();

tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
let metrics_component = root_metrics.read();
MetricsComponent::publish(
&*metrics_component,
MetricKind::Component,
MetricFieldData::default(),
)
})
.unwrap();

let output_json_data = to_string(&*output_metrics.lock()).unwrap();

let parsed_output: Value = from_str(&output_json_data).unwrap();

let fingerprint_create_index = parsed_output["stores"]["redis_store"]
["fingerprint_create_index"]
.as_str()
.expect("fingerprint_create_index should be a hex string");

assert_eq!(fingerprint_create_index, expected_fingerprint_value);

Ok(())
}

#[derive(MetricsComponent)]
struct RootMetricsTest {
#[metric(group = "stores")]
stores: Arc<dyn RootMetricsComponent>,
}

impl RootMetricsComponent for RootMetricsTest {}

0 comments on commit 8a90f09

Please sign in to comment.