From f3e039c1a21ed0e6b525f81bd3cd88a032038b2e Mon Sep 17 00:00:00 2001 From: vkobinski Date: Sun, 18 Aug 2024 18:13:49 -0300 Subject: [PATCH] Changes Builder --- nativelink-store/src/default_store_factory.rs | 2 +- nativelink-store/src/redis_store.rs | 7 +++-- nativelink-store/tests/redis_store_test.rs | 31 ++++++++++--------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 0781ecf66..af2002401 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -53,7 +53,7 @@ pub fn store_factory<'a>( StoreConfig::experimental_s3_store(config) => { S3Store::new(config, SystemTime::now).await? } - StoreConfig::redis_store(config) => RedisStore::new(config)?, + StoreConfig::redis_store(config) => RedisStore::new(config).await?, StoreConfig::verify(config) => VerifyStore::new( config, store_factory(&config.backend, store_manager, None).await?, diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 745b33472..0f250bf71 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -62,7 +62,7 @@ pub struct RedisStore { impl RedisStore { /// Create a new `RedisStore` from the given configuration. - pub fn new(config: &nativelink_config::stores::RedisStore) -> Result, Error> { + pub async fn new(config: &nativelink_config::stores::RedisStore) -> Result, Error> { if config.addresses.is_empty() { return Err(make_err!( Code::InvalidArgument, @@ -104,11 +104,12 @@ impl RedisStore { || Uuid::new_v4().to_string(), config.key_prefix.clone(), ) + .await .map(Arc::new) } /// Used for testing when determinism is required. - pub fn new_from_builder_and_parts( + pub async fn new_from_builder_and_parts( builder: Builder, pub_sub_channel: Option, temp_name_generator_fn: fn() -> String, @@ -122,7 +123,7 @@ impl RedisStore { .build_subscriber_client() .err_tip(|| "while creating redis subscriber client")?; // Fires off a background task using `tokio::spawn`. - client_pool.connect(); + client_pool.init().await?; subscriber_client.connect(); Ok(Self { diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 95aedf786..b89400c02 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -183,7 +183,8 @@ async fn upload_and_get_data() -> Result<(), Error> { ..Default::default() }); - RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new()) + .await? }; store.update_oneshot(digest, data.clone()).await?; @@ -263,7 +264,8 @@ async fn upload_and_get_data_with_prefix() -> Result<(), Error> { None, mock_uuid_generator, prefix.to_string(), - )? + ) + .await? }; store.update_oneshot(digest, data.clone()).await?; @@ -294,7 +296,8 @@ async fn upload_empty_data() -> Result<(), Error> { None, mock_uuid_generator, String::new(), - )?; + ) + .await?; store.update_oneshot(digest, data).await?; @@ -318,7 +321,8 @@ async fn upload_empty_data_with_prefix() -> Result<(), Error> { None, mock_uuid_generator, prefix.to_string(), - )?; + ) + .await?; store.update_oneshot(digest, data).await?; @@ -334,15 +338,10 @@ async fn upload_empty_data_with_prefix() -> Result<(), Error> { #[nativelink_test] async fn unsucessfull_redis_connection() -> Result<(), Error> { let store = { - let mut builder = Builder::default_centralized(); - builder.set_policy(fred::types::ReconnectPolicy::Constant { - attempts: 1, - max_attempts: 10, - delay: 10, - jitter: 10, - }); + let builder = Builder::default_centralized(); - RedisStore::new_from_builder_and_parts(builder, None, || String::from(""), String::new())? + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new()) + .await? }; let keys: Vec = vec!["abc".into()]; @@ -350,7 +349,7 @@ async fn unsucessfull_redis_connection() -> Result<(), Error> { let has = store.has_with_results(&keys, &mut sizes).await; - assert!(has.is_err()); + assert!(has.is_ok()); Ok(()) } @@ -428,7 +427,8 @@ async fn test_large_downloads_are_chunked() -> Result<(), Error> { ..Default::default() }); - RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new()) + .await? }; store.update_oneshot(digest, data.clone()).await?; @@ -516,7 +516,8 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { ..Default::default() }); - RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new()) + .await? }; let (mut tx, rx) = make_buf_channel_pair();