diff --git a/Cargo.lock b/Cargo.lock index e1758706a..52b4123d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -693,9 +693,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" [[package]] name = "cfg-if" @@ -749,20 +749,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" -[[package]] -name = "combine" -version = "4.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" -dependencies = [ - "bytes", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "concurrent-queue" version = "2.5.0" @@ -815,6 +801,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.4" @@ -873,6 +865,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -1004,6 +1005,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1025,6 +1035,35 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db0f0c49aba98a3b2578315766960bd242885ff672fd62610c5557cd6c6efe03" +[[package]] +name = "fred" +version = "9.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915e065b377f6e16d5c01eae96bf31eeaf81e1e300b76f938761b3c21307cad8" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "crossbeam-queue", + "float-cmp", + "futures", + "log", + "parking_lot", + "rand", + "redis-protocol", + "rustls 0.23.12", + "rustls-native-certs 0.7.1", + "semver", + "socket2", + "tokio", + "tokio-rustls 0.26.0", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + [[package]] name = "futures" version = "0.3.30" @@ -1371,7 +1410,7 @@ dependencies = [ "hyper 0.14.30", "log", "rustls 0.21.12", - "rustls-native-certs", + "rustls-native-certs 0.6.3", "tokio", "tokio-rustls 0.24.1", "webpki-roots", @@ -1693,12 +1732,12 @@ dependencies = [ name = "nativelink-error" version = "0.4.0" dependencies = [ + "fred", "hex", "nativelink-metric", "nativelink-proto", "prost", "prost-types", - "redis", "serde", "tokio", "tonic", @@ -1843,6 +1882,7 @@ dependencies = [ "byteorder", "bytes", "filetime", + "fred", "futures", "hex", "http 1.1.0", @@ -1863,8 +1903,6 @@ dependencies = [ "pretty_assertions", "prost", "rand", - "redis", - "redis-test", "serde", "serial_test", "sha2", @@ -2064,9 +2102,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "4.2.1" +version = "4.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be" +checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" dependencies = [ "num-traits", ] @@ -2384,41 +2422,17 @@ dependencies = [ ] [[package]] -name = "redis" -version = "0.25.4" +name = "redis-protocol" +version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +checksum = "65deb7c9501fbb2b6f812a30d59c0253779480853545153a51d8e9e444ddc99f" dependencies = [ - "arc-swap", - "async-trait", "bytes", - "combine", + "bytes-utils", + "cookie-factory", "crc16", - "futures", - "futures-util", - "itoa", "log", - "percent-encoding", - "pin-project-lite", - "rand", - "ryu", - "sha1_smol", - "socket2", - "tokio", - "tokio-retry", - "tokio-rustls 0.25.0", - "tokio-util", - "url", -] - -[[package]] -name = "redis-test" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a948b3cec9e4b1fedbb0f0788e79029fb1f641b6cfefb7a15d044f803854427" -dependencies = [ - "futures", - "redis", + "nom", ] [[package]] @@ -2575,7 +2589,6 @@ version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ - "log", "ring", "rustls-pki-types", "rustls-webpki 0.102.6", @@ -2610,6 +2623,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2670,9 +2696,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "scc" -version = "2.1.5" +version = "2.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fadf67e3cf23f8b11a6c8c48a16cb2437381503615acd91094ec7b4686a5a53" +checksum = "05ccfb12511cdb770157ace92d7dda771e498445b78f9886e8cdbc5140a4eced" dependencies = [ "sdd", ] @@ -2704,9 +2730,9 @@ dependencies = [ [[package]] name = "sdd" -version = "1.7.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85f05a494052771fc5bd0619742363b5e24e5ad72ab3111ec2e27925b8edc5f3" +checksum = "177258b64c0faaa9ffd3c65cd3262c2bc7e2588dbbd9c1641d0346145c1bbda8" [[package]] name = "security-framework" @@ -2759,12 +2785,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" dependencies = [ "indexmap 2.2.6", "itoa", + "memchr", "ryu", "serde", ] @@ -2815,12 +2842,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha1_smol" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" - [[package]] name = "sha2" version = "0.10.8" @@ -3061,17 +3082,6 @@ dependencies = [ "syn 2.0.72", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/Cargo.toml b/Cargo.toml index 9484b8d9a..d008f5d59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,9 @@ rustls-pemfile = { version = "2.1.2", default-features = false } scopeguard = { version = "1.2.0", default-features = false } serde_json5 = "0.1.0" tokio = { version = "1.38.0", features = ["rt-multi-thread", "signal"] } -tokio-rustls = { version = "0.25.0", default-features = false } +tokio-rustls = { version = "0.25.0", default-features = false, features = [ + "ring", +] } tonic = { version = "0.12.0", default-features = false } tower = { version = "0.4.13", default-features = false } tracing = { version = "0.1.40", default-features = false } diff --git a/nativelink-config/examples/redis.json b/nativelink-config/examples/redis.json index 5cb273659..1f06fe560 100644 --- a/nativelink-config/examples/redis.json +++ b/nativelink-config/examples/redis.json @@ -1,123 +1,85 @@ { - "stores": { - "CAS_FAST_SLOW_STORE": { - "fast_slow": { - "fast": { - "redis_store": { - "addresses": [ - "redis://127.0.0.1:6379/", // Master node 1 - "redis://127.0.0.1:6380/", // Master node 2 - "redis://127.0.0.1:6381/", // Master node 3 - "redis://127.0.0.1:6382/", // Master node 3 - "redis://127.0.0.1:6383/", // Master node 3 - "redis://127.0.0.1:6384/" // Master node 3 - //"redis://172.18.0.2:6379/" // Master node 3 - // "redis://172.18.0.3:6379/", // Master node 3 - // "redis://172.18.0.4:6379/" // Master node 3 - ] - } - }, - "slow": { - "filesystem": { - "content_path": "/tmp/nativelink/data/content_path-index", - "temp_path": "/tmp/nativelink/data/tmp_path-index", - "eviction_policy": { - "max_bytes": 120000000000 - } - } + "stores": { + "CAS_FAST_SLOW_STORE": { + "redis_store": { + "addresses": ["redis://127.0.0.1:6379/"], + "mode": "cluster" + } + }, + "AC_FAST_SLOW_STORE": { + "redis_store": { + "addresses": ["redis://127.0.0.1:6379/"], + "mode": "cluster" + } + }, + "AC_MAIN_STORE": { + "completeness_checking": { + "backend": { + "ref_store": { + "name": "AC_FAST_SLOW_STORE" + } + }, + "cas_store": { + "ref_store": { + "name": "CAS_MAIN_STORE" } } - }, - "AC_FAST_SLOW_STORE": { - "fast_slow": { - "fast": { - "filesystem": { - "content_path": "/tmp/nativelink/data/content_path-index", - "temp_path": "/tmp/nativelink/data/tmp_path-index", - "eviction_policy": { - "max_bytes": 120000000000 - } - } - }, - "slow": { - "filesystem": { - "content_path": "/tmp/nativelink/data/content_path-ac", - "temp_path": "/tmp/nativelink/data/tmp_path-ac", - "eviction_policy": { - "max_bytes": 5000000000 + } + }, + "CAS_MAIN_STORE": { + "existence_cache": { + "backend": { + "compression": { + "compression_algorithm": { + "lz4": {} + }, + "backend": { + "ref_store": { + "name": "CAS_FAST_SLOW_STORE" } } } } - }, - "AC_MAIN_STORE": { - "completeness_checking": { - "backend": { - "ref_store": { - "name": "AC_FAST_SLOW_STORE" - } - }, - "cas_store": { - "ref_store": { - "name": "CAS_MAIN_STORE" - } - } + } + } + }, + "servers": [ + { + "listener": { + "http": { + "socket_address": "0.0.0.0:50051" } }, - "CAS_MAIN_STORE": { - "existence_cache": { - "backend": { - "compression": { - "compression_algorithm": { - "lz4": {} - }, - "backend": { - "ref_store": { - "name": "CAS_FAST_SLOW_STORE" - } - } - } + "services": { + "cas": { + "main": { + "cas_store": "CAS_MAIN_STORE" } - } - } - }, - "servers": [ - { - "listener": { - "http": { - "socket_address": "0.0.0.0:50051" + }, + "ac": { + "main": { + "ac_store": "AC_MAIN_STORE" } }, - "services": { - "cas": { - "main": { - "cas_store": "CAS_MAIN_STORE" - } - }, - "ac": { - "main": { - "ac_store": "AC_MAIN_STORE" - } - }, - "capabilities": {}, - "bytestream": { - "cas_stores": { - "main": "CAS_MAIN_STORE" - } + "capabilities": {}, + "bytestream": { + "cas_stores": { + "main": "CAS_MAIN_STORE" } } + } + }, + { + "listener": { + "http": { + "socket_address": "0.0.0.0:50061" + } }, - { - "listener": { - "http": { - "socket_address": "0.0.0.0:50061" - } - }, - "services": { - "experimental_prometheus": { - "path": "/metrics" - } + "services": { + "experimental_prometheus": { + "path": "/metrics" } } - ] - } + } + ] +} diff --git a/nativelink-error/BUILD.bazel b/nativelink-error/BUILD.bazel index 841be517c..08d8783de 100644 --- a/nativelink-error/BUILD.bazel +++ b/nativelink-error/BUILD.bazel @@ -14,10 +14,10 @@ rust_library( deps = [ "//nativelink-metric", "//nativelink-proto", + "@crates//:fred", "@crates//:hex", "@crates//:prost", "@crates//:prost-types", - "@crates//:redis", "@crates//:serde", "@crates//:tokio", "@crates//:tonic", diff --git a/nativelink-error/Cargo.toml b/nativelink-error/Cargo.toml index e6496b098..e5c16d08b 100644 --- a/nativelink-error/Cargo.toml +++ b/nativelink-error/Cargo.toml @@ -10,10 +10,12 @@ autobenches = false [dependencies] nativelink-proto = { path = "../nativelink-proto" } nativelink-metric = { path = "../nativelink-metric" } +fred = { version = "9.0.3", default-features = false, features = [ + "enable-rustls-ring", +] } hex = { version = "0.4.3", default-features = false } prost = { version = "0.13.1", default-features = false } prost-types = { version = "0.13.1", default-features = false } -redis = { version = "0.25.4", default-features = false } serde = { version = "1.0.204", default-features = false } tokio = { version = "1.38.0" } tonic = { version = "0.12.0", default-features = false } diff --git a/nativelink-error/src/lib.rs b/nativelink-error/src/lib.rs index 776f312ad..fee8093bd 100644 --- a/nativelink-error/src/lib.rs +++ b/nativelink-error/src/lib.rs @@ -211,18 +211,35 @@ impl From for Error { } } -impl From for Error { - fn from(err: redis::RedisError) -> Self { - make_err!(Code::Internal, "{}", err.to_string()) - } -} - impl From for Error { fn from(code: Code) -> Self { make_err!(code, "") } } +impl From for Error { + fn from(error: fred::error::RedisError) -> Self { + use fred::error::RedisErrorKind::{ + Auth, Backpressure, Canceled, Cluster, Config, InvalidArgument, InvalidCommand, + NotFound, Parse, Protocol, Sentinel, Timeout, Tls, Unknown, Url, IO, + }; + + // Conversions here are based on https://grpc.github.io/grpc/core/md_doc_statuscodes.html. + let code = match error.kind() { + Config | InvalidCommand | InvalidArgument | Url => Code::InvalidArgument, + IO | Protocol | Tls | Cluster | Parse | Sentinel => Code::Internal, + Auth => Code::PermissionDenied, + Canceled => Code::Aborted, + Unknown => Code::Unknown, + Timeout => Code::DeadlineExceeded, + NotFound => Code::NotFound, + Backpressure => Code::Unavailable, + }; + + make_err!(code, "{error}") + } +} + impl From for Error { fn from(error: tonic::transport::Error) -> Self { make_err!(Code::Internal, "{}", error.to_string()) diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index d011476d1..7000db9cc 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -49,6 +49,7 @@ rust_library( "@crates//:byteorder", "@crates//:bytes", "@crates//:filetime", + "@crates//:fred", "@crates//:futures", "@crates//:hex", "@crates//:http-body", @@ -58,7 +59,6 @@ rust_library( "@crates//:parking_lot", "@crates//:prost", "@crates//:rand", - "@crates//:redis", "@crates//:serde", "@crates//:tokio", "@crates//:tokio-stream", @@ -108,6 +108,7 @@ rust_test_suite( "@crates//:bincode", "@crates//:bytes", "@crates//:filetime", + "@crates//:fred", "@crates//:futures", "@crates//:hex", "@crates//:http", @@ -118,8 +119,6 @@ rust_test_suite( "@crates//:parking_lot", "@crates//:pretty_assertions", "@crates//:rand", - "@crates//:redis", - "@crates//:redis-test", "@crates//:serial_test", "@crates//:sha2", "@crates//:tokio", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index 87a562a39..958946ddc 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -11,24 +11,37 @@ nativelink-proto = { path = "../nativelink-proto" } nativelink-metric = { path = "../nativelink-metric" } async-lock = { version = "3.3.0", default-features = false } async-trait = "0.1.80" -aws-config = { version = "1.5.4", default-features = false, features = ["rustls"] } -aws-sdk-s3 = { version = "1.41.0", features = ["rt-tokio"], default-features = false } +aws-config = { version = "1.5.4", default-features = false, features = [ + "rustls", +] } +aws-sdk-s3 = { version = "1.41.0", default-features = false } aws-smithy-runtime = { version = "1.6.2" } bincode = "1.3.3" blake3 = { version = "1.5.2", default-features = false } byteorder = { version = "1.5.0", default-features = false } bytes = { version = "1.6.1", default-features = false } filetime = "0.2.23" +fred = { version = "9.0.3", features = [ + "enable-rustls-ring", + "metrics", + "blocking-encoding", + "custom-reconnect-errors", + "sentinel-client", + "sentinel-auth", + "subscriber-client", + "mocks", +] } futures = { version = "0.3.30", default-features = false } hex = { version = "0.4.3", default-features = false } http-body = "1.0.1" hyper = { version = "0.14.30" } -hyper-rustls = { version = "0.24.2", default-features = false, features = ["webpki-roots"] } +hyper-rustls = { version = "0.24.2", default-features = false, features = [ + "webpki-roots", +] } lz4_flex = { version = "0.11.3", default-features = false } parking_lot = "0.12.3" prost = { version = "0.13.1", default-features = false } rand = { version = "0.8.5", default-features = false } -redis = { version = "0.25.4", features = ["cluster-async", "connection-manager", "tokio-comp", "tokio-rustls"], default-features = false } serde = { version = "1.0.204", default-features = false } tokio = "1.38.0" tokio-stream = { version = "0.1.15", default-features = false } @@ -39,7 +52,6 @@ uuid = { version = "1.10.0", default-features = false } [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } -redis-test = { version = "0.4.0", features = ["aio"], default-features = false } pretty_assertions = { version = "1.4.0", features = ["std"] } memory-stats = "1.2.0" mock_instant = "0.3.2" @@ -47,6 +59,13 @@ once_cell = { version = "1.19.0", default-features = false } sha2 = { version = "0.10.8", default-features = false } http = { version = "1.1.0", default-features = false } aws-smithy-types = "1.2.0" -aws-smithy-runtime = { version = "1.6.2", features = ["test-util"], default-features = false } +aws-smithy-runtime = { version = "1.6.2", features = [ + "test-util", +], default-features = false } +aws-sdk-s3 = { version = "1.41.0", features = [ + "rt-tokio", +], default-features = false } aws-smithy-runtime-api = "1.7.1" -serial_test = { version = "3.1.1", features = ["async"], default-features = false } +serial_test = { version = "3.1.1", features = [ + "async", +], default-features = false } diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index c72e29d9f..7980a5368 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -13,240 +13,41 @@ // limitations under the License. use std::borrow::Cow; -use std::cell::{OnceCell, UnsafeCell}; -use std::fmt::Display; +use std::cmp; use std::pin::Pin; -use std::sync::{Arc, Once}; +use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; -use futures::future::{ErrInto, FutureExt, Shared}; -use futures::stream::FuturesOrdered; -use futures::{Future, TryFutureExt, TryStreamExt}; +use fred::clients::RedisPool; +use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface}; +use fred::types::{Builder, ConnectionConfig, PerformanceConfig, ReconnectPolicy, RedisConfig}; use nativelink_config::stores::RedisMode; -use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; +use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; -use nativelink_util::background_spawn; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; -use redis::aio::{ConnectionLike, ConnectionManager}; -use redis::cluster_async::ClusterConnection; -use redis::{AsyncCommands, ToRedisArgs}; -use tokio::task::JoinHandle; +use uuid::Uuid; use crate::cas_utils::is_zero_digest; -const READ_CHUNK_SIZE: isize = 64 * 1024; - -/// A wrapper type containing the different Redis clients we support. -// -// Typically we would use `dyn ConnectionLike` instead of creating a wrapper type, but these clients are cheaply -// cloneable; you're meant to clone a client in order to get mutable access. -// [`Clone`] has a [`Sized`] bound, which means that any supertrait we constructed with a `Clone` bound -// wouldn't be object safe -- in short, this won't compile: -// -// ```compile_fail -// trait CloneableConnectionHandle: ConnectionLike + Clone {} -// -// impl CloneableConnectionHandle for C {} -// ``` -#[derive(Clone)] -pub enum ConnectionKind { - Cluster(ClusterConnection), - Single(ConnectionManager), -} - -impl From for ConnectionKind { - fn from(value: ClusterConnection) -> Self { - Self::Cluster(value) - } -} - -impl From for ConnectionKind { - fn from(value: ConnectionManager) -> Self { - Self::Single(value) - } -} - -// delegate to the inner impl's -impl ConnectionLike for ConnectionKind { - fn req_packed_command<'a>( - &'a mut self, - cmd: &'a redis::Cmd, - ) -> redis::RedisFuture<'a, redis::Value> { - match self { - ConnectionKind::Cluster(inner) => inner.req_packed_command(cmd), - ConnectionKind::Single(inner) => inner.req_packed_command(cmd), - } - } - - fn req_packed_commands<'a>( - &'a mut self, - cmd: &'a redis::Pipeline, - offset: usize, - count: usize, - ) -> redis::RedisFuture<'a, Vec> { - match self { - ConnectionKind::Cluster(inner) => inner.req_packed_commands(cmd, offset, count), - ConnectionKind::Single(inner) => inner.req_packed_commands(cmd, offset, count), - } - } - - fn get_db(&self) -> i64 { - match self { - ConnectionKind::Cluster(inner) => inner.get_db(), - ConnectionKind::Single(inner) => inner.get_db(), - } - } -} - -/// Type alias for a [`Shared`] [`JoinHandle`] that has had its [`JoinError`](`tokio::task::JoinError`) mapped to an [`Error`] -type RedisConnectionFuture = Shared>, Error>>; - -/// Represents the possible states of a Redis connection. -enum ConnectionState { - /// Contains a future that must be polled to connect to Redis - Connecting(RedisConnectionFuture), - - /// Contains a connection that was made successfully - Connected(C), - - /// Contains an error that occurred while connecting - Errored(Error), -} - -/// Represents a connection to Redis. -pub struct BackgroundConnection { - /// Synchronization primitive used for tracking if `self.state` has been changed from `ConnectionState::Connecting` - /// to `ConnectionState::Error` or `ConnectionState::Connected`. Once it's been changed exactly once, - /// [`Once::is_completed`] will return `true`. - once: Once, - - /// Contains the current state of the connection. - // Invariant: the state must be mutated exactly once. - state: UnsafeCell>, -} - -impl BackgroundConnection { - /// Connect to a single Redis instance. - /// - /// ## Errors - /// - /// Some cursory checks are performed on the given connection info that can fail before a connection is established. - /// Errors that occur during the connection process are surfaced when the connection is first used. - pub fn single(params: T) -> Result { - let client = redis::Client::open(params).map_err(from_redis_err)?; - let init = async move { client.get_connection_manager().await }; - Ok(Self::with_initializer(init)) - } - - /// Connect to multiple Redis instances configured in cluster mode - /// - /// ## Errors - /// - /// Some cursory checks are performed on the given connection info that can fail before a connection is established. - /// Errors that occur during the connection are surfaced when the connection is first used. - pub fn cluster( - params: impl IntoIterator, - ) -> Result { - let client = redis::cluster::ClusterClient::new(params).map_err(from_redis_err)?; - let init = async move { client.get_async_connection().await }; - Ok(Self::with_initializer(init)) - } -} - -impl BackgroundConnection { - /// Initialize a new connection by spawning a background task to run the provided future to completion. - /// - /// Outside of testing, you will probably want to use [`BackgroundConnection::single`] or [`BackgroundConnection::cluster`]. - pub fn with_initializer(init: Fut) -> Self - where - Fut: Future> + Send + 'static, - C: From, - T: Send + 'static, - Error: From, - E: Send + 'static, - { - let handle = background_spawn!("redis_initial_connection", init.err_into().ok_into()); - let state = ConnectionState::Connecting(handle.err_into().shared()); - Self { - once: Once::new(), - state: UnsafeCell::new(state), - } - } - - /// Retrieve the underlying connection. If the connection hasn't been established yet, the current task will - /// wait until the connection has been made. - /// - /// ## Errors - /// - /// Returns an error if there was an issue establishing a connection to Redis. - async fn get(&self) -> Result { - // Safety: we don't mutate state here, so normal borrowck rules are followed and the invariant is upheld - let state_ref = unsafe { &*self.state.get() }; - let connection_future = match state_ref { - ConnectionState::Connecting(handle) => Shared::clone(handle), - ConnectionState::Connected(connection) => return Ok(connection.clone()), - ConnectionState::Errored(error) => return Err(error.clone()), - }; - - let connection_result = connection_future.await.and_then(|conn| conn); - self.once.call_once(|| { - // Safety: This part is `unsafe` because we break borrowck's rules of aliasing XOR mutability; - // calling `LazyConnection::get` takes `&self`, but now we're going to take `&mut self.state`. - // This means that if multiple tasks call `LazyConnection::get` at once, they could potentially - // attempt to mutate `self.state` simultaneously, which is `unsafe` -- it's a data race. - // - // The synchronization primitive we're using here to manually enforce borrowck's rules is [`Once`], - // which allows for executing closures exactly once. We use this guarantee to ensure that we only - // mutate `self.state` exactly once, despite having multiple tasks awaiting the same connection. - // - // Put another way: we only mutate state exactly once, inside this closure (exclusive). Outside of - // the closure, multiple tasks can read state simultaneously (aliasing). - // - // More specifically, the `Once` can be in one of three states: - // - // 1. Uninitialized - // In this state, borrowck rules are followed because nobody has attempted to mutate `self.state` yet. - // 2. Initializing - // In this state, borrowck rules are followed because exactly one thread has exclusive mutable access - // to `self.state`, while all other threads block -- i.e. they will not read or write state. - // 3. Initialized - // In this state, borrowck rules are followed because this closure will never get called. - // - // Put a third way: we've essentially recreated a `RwLock` that always prioritizes writes, and only allows - // one write. The invariant is upheld. - let state_mut = unsafe { &mut *self.state.get() }; - *state_mut = match connection_result.clone() { - Ok(connection) => ConnectionState::Connected(connection), - Err(error) => ConnectionState::Errored(error), - }; - }); - - connection_result - } -} - -// Safety: We don't hold any raw pointers or `!Send` types except `UnsafeCell`. -// Why do we need `C: Send`? -// Task A creates a `BackgroundConnection` and shares it with task B, -// which mutates the cell, which is then destroyed by A. -// That is, destructor observes a sent value. -unsafe impl Send for BackgroundConnection {} - -// Safety: We ensure that exactly one task will mutate state exactly once. -unsafe impl Sync for BackgroundConnection {} +// TODO(caass): These (and other settings) should be made configurable via nativelink-config. +pub const READ_CHUNK_SIZE: usize = 64 * 1024; +const CONNECTION_POOL_SIZE: usize = 3; /// A [`StoreDriver`] implementation that uses Redis as a backing store. #[derive(MetricsComponent)] -pub struct RedisStore { - /// The connection to the underlying Redis instance(s). - connection: BackgroundConnection, +pub struct RedisStore { + /// The client pool connecting to the backing Redis instance(s). + client_pool: RedisPool, + + /// A channel to publish updates to when a key is added, removed, or modified. + pub_sub_channel: Option, /// A function used to generate names for temporary keys. temp_name_generator_fn: fn() -> String, - pub_sub_channel: Option, /// A common prefix to append to all keys before they are sent to Redis. /// @@ -255,165 +56,159 @@ pub struct RedisStore { key_prefix: String, } -impl RedisStore { +impl RedisStore { + /// Create a new `RedisStore` from the given configuration. pub fn new(config: &nativelink_config::stores::RedisStore) -> Result, Error> { if config.addresses.is_empty() { - return Err(Error::new( + return Err(make_err!( Code::InvalidArgument, - "At least one address must be specified to connect to Redis".to_string(), + "No addresses were specified in redis store configuration." )); }; - let connection = - match config.mode { - RedisMode::Cluster => { - let addrs = config.addresses.iter().map(String::as_str); - BackgroundConnection::cluster(addrs)? - } - RedisMode::Standard if config.addresses.len() > 1 => return Err(Error::new( - Code::InvalidArgument, - "Attempted to connect to multiple addresses without setting `cluster = true`" - .to_string(), - )), - RedisMode::Standard => { - let addr = config.addresses[0].as_str(); - BackgroundConnection::single(addr)? - } - RedisMode::Sentinel => { - return Err(Error::new( - Code::Unimplemented, - "Sentinel mode is currently not supported.".to_string(), - )) - } - }; - - Ok(Arc::new( - RedisStore::new_with_conn_and_name_generator_and_prefix( - connection, - || uuid::Uuid::new_v4().to_string(), - config.experimental_pub_sub_channel.clone(), - config.key_prefix.clone(), - ), - )) - } -} - -impl RedisStore { - #[inline] - pub fn new_with_conn_and_name_generator( - connection: BackgroundConnection, - temp_name_generator_fn: fn() -> String, - ) -> Self { - RedisStore::new_with_conn_and_name_generator_and_prefix( - connection, - temp_name_generator_fn, - None, - String::new(), + let [addr] = config.addresses.as_slice() else { + return Err(make_err!(Code::Unimplemented, "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes.")); + }; + let redis_config = match config.mode { + RedisMode::Cluster => RedisConfig::from_url_clustered(addr), + RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr), + RedisMode::Standard => RedisConfig::from_url_centralized(addr), + } + .err_tip_with_code(|e| { + ( + Code::InvalidArgument, + format!("while parsing redis node address: {e}"), + ) + })?; + + let mut builder = Builder::from_config(redis_config); + builder + .set_performance_config(PerformanceConfig { + default_command_timeout: Duration::from_secs(config.response_timeout_s), + ..Default::default() + }) + .set_connection_config(ConnectionConfig { + connection_timeout: Duration::from_secs(config.connection_timeout_s), + internal_command_timeout: Duration::from_secs(config.response_timeout_s), + ..Default::default() + }) + // TODO(caass): Make this configurable. + .set_policy(ReconnectPolicy::new_constant(1, 0)); + + Self::new_from_builder_and_parts( + builder, + config.experimental_pub_sub_channel.clone(), + || Uuid::new_v4().to_string(), + config.key_prefix.clone(), ) + .map(Arc::new) } - #[inline] - pub fn new_with_conn_and_name_generator_and_prefix( - connection: BackgroundConnection, - temp_name_generator_fn: fn() -> String, + /// Used for testing when determinism is required. + pub fn new_from_builder_and_parts( + builder: Builder, pub_sub_channel: Option, + temp_name_generator_fn: fn() -> String, key_prefix: String, - ) -> Self { - RedisStore { - connection, - temp_name_generator_fn, + ) -> Result { + let client_pool = builder + .build_pool(CONNECTION_POOL_SIZE) + .err_tip(|| "while creating redis connection pool")?; + + // Fires off a background task using `tokio::spawn`. + client_pool.connect(); + + Ok(Self { + client_pool, pub_sub_channel, + temp_name_generator_fn, key_prefix, - } - } - - #[inline] - pub async fn get_conn(&self) -> Result { - self.connection.get().await + }) } /// Encode a [`StoreKey`] so it can be sent to Redis. - fn encode_key<'a>(&self, key: StoreKey<'a>) -> impl ToRedisArgs + Display + Send + Sync + 'a { - // TODO(caass): Once https://github.com/redis-rs/redis-rs/pull/1219 makes it into a release, - // this can be changed to - // ```rust - // if self.key_prefix.is_empty() { - // key.as_str() - // } else { - // let mut encoded_key = String::with_capacity(self.key_prefix.len() + key_body.len()); - // encoded_key.push_str(&self.key_prefix); - // encoded_key.push_str(&key_body); - // Cow::Owned(encoded_key) - // } - //``` - // and the return type changed to `Cow<'a, str>` + fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { let key_body = key.as_str(); - - let mut encoded_key = String::with_capacity(self.key_prefix.len() + key_body.len()); - encoded_key.push_str(&self.key_prefix); - encoded_key.push_str(&key_body); - - encoded_key + if self.key_prefix.is_empty() { + key_body + } else { + // This is in the hot path for all redis operations, so we try to reuse the allocation + // from `key.as_str()` if possible. + match key_body { + Cow::Owned(mut encoded_key) => { + encoded_key.insert_str(0, &self.key_prefix); + Cow::Owned(encoded_key) + } + Cow::Borrowed(body) => { + let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len()); + encoded_key.push_str(&self.key_prefix); + encoded_key.push_str(body); + Cow::Owned(encoded_key) + } + } + } } } #[async_trait] -impl StoreDriver for RedisStore -where - C: ConnectionLike + Clone + Send + Sync + Unpin + 'static, -{ +impl StoreDriver for RedisStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - if keys.len() == 1 && is_zero_digest(keys[0].borrow()) { - results[0] = Some(0); - return Ok(()); - } + // TODO(caass): Optimize for the case where `keys.len() == 1` + let pipeline = self.client_pool.next().pipeline(); - let mut zero_digest_indexes = Vec::new(); + results.iter_mut().for_each(|result| *result = None); - let queries = - keys.iter() - .enumerate() - .map(|(index, key)| { - if is_zero_digest(key.borrow()) { - zero_digest_indexes.push(index); - } - let encoded_key = self.encode_key(key.borrow()); + for (idx, key) in keys.iter().enumerate() { + // Don't bother with zero-length digests. + if is_zero_digest(key.borrow()) { + results[idx] = Some(0); + continue; + } - async { - let mut conn = self.get_conn().await.err_tip(|| { - "Error: Could not get connection handle in has_with_results" - })?; + let encoded_key = self.encode_key(key); - conn.strlen::<_, usize>(encoded_key) - .await - .map_err(from_redis_err) - .err_tip(|| "Error: Could not call strlen in has_with_results") - } - }) - .collect::>(); - - let digest_sizes = queries.try_collect::>().await?; + // This command is queued in memory, but not yet sent down the pipeline; the `await` returns instantly. + pipeline + .strlen::<(), _>(encoded_key.as_ref()) + .await + .err_tip(|| "In RedisStore::has_with_results")?; + } - error_if!( - digest_sizes.len() != results.len(), - "Mismatch in digest sizes and results length" - ); + // Send the queued commands. + let mut responses = pipeline.all::>().await?.into_iter(); + let mut remaining_results = results.iter_mut().filter(|option| { + // Anything that's `Some` was already set from `is_zero_digest`. + option.is_none() + }); - digest_sizes - .into_iter() - .zip(results.iter_mut()) - .for_each(|(size, result)| { - *result = if size == 0 { None } else { Some(size) }; - }); + // Similar to `Iterator::zip`, but with some verification at the end that the lengths were equal. + while let (Some(response), Some(result_slot)) = (responses.next(), remaining_results.next()) + { + if response == 0 { + // Redis returns 0 when the key doesn't exist AND when the key exists with value of length 0. + // Since we already checked zero-lengths with `is_zero_digest`, this means the value doesn't exist. + continue; + } - zero_digest_indexes.into_iter().for_each(|index| { - results[index] = Some(0); - }); + *result_slot = Some(response); + } - Ok(()) + if responses.next().is_some() { + Err(make_err!( + Code::Internal, + "Received more responses than expected in RedisStore::has_with_results" + )) + } else if remaining_results.next().is_some() { + Err(make_err!( + Code::Internal, + "Received fewer responses than expected in RedisStore::has_with_results" + )) + } else { + Ok(()) + } } async fn update( @@ -422,8 +217,7 @@ where mut reader: DropCloserReadHalf, _upload_size: UploadSizeInfo, ) -> Result<(), Error> { - let temp_key = OnceCell::new(); - let final_key = self.encode_key(key.borrow()); + let final_key = self.encode_key(&key); // While the name generation function can be supplied by the user, we need to have the curly // braces in place in order to manage redis' hashing behavior and make sure that the temporary @@ -433,25 +227,39 @@ where // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's // identical to the final key -- so they will always hash to the same node. - // - // TODO(caass): the stabilization PR for [`LazyCell`](`std::cell::LazyCell`) has been merged into rust-lang, - // so in the next stable release we can use LazyCell::new(|| { ... }) instead. - let make_temp_name = || { - format!( - "temp-{}-{{{}}}", - (self.temp_name_generator_fn)(), - &final_key - ) - }; + let temp_key = format!( + "temp-{}-{{{}}}", + (self.temp_name_generator_fn)(), + &final_key + ); - let mut conn = self.get_conn().await?; - let mut pipe = redis::pipe(); - pipe.atomic(); + let client = self.client_pool.next(); + // This loop is a little confusing at first glance, but essentially the process is: + // - Get as much data from the reader as possible + // - When the reader is empty, but the writer isn't done sending data, write that data to redis + // - When the writer is done sending data, write the data and break from the loop + // + // At one extreme, we could append data in redis every time we read some bytes -- that is, make one TCP request + // per channel read. This is wasteful since we anticipate reading many small chunks of bytes from the reader. + // + // At the other extreme, we could make a single TCP request to write all of the data all at once. + // This could also be an issue if we read loads of data, since we'd send one massive TCP request + // rather than a few moderately-sized requests. + // + // To compromise, we buffer opportunistically -- when the reader doesn't have any data ready to read, but it's + // not done getting data, we flush the data we _have_ read to redis before waiting for the reader to get more. + // + // As a result of this, there will be a span of time where a key in Redis has only partial data. We want other + // observers to notice atomic updates to keys, rather than partial updates, so we first write to a temporary key + // and then rename that key once we're done appending data. + // + // TODO(caass): Remove potential for infinite loop (https://reviewable.io/reviews/TraceMachina/nativelink/1188#-O2pu9LV5ux4ILuT6MND) 'outer: loop { - let mut force_recv = true; + let mut expecting_first_chunk = true; + let pipe = client.pipeline(); - while force_recv || !reader.is_empty() { + while expecting_first_chunk || !reader.is_empty() { let chunk = reader .recv() .await @@ -461,49 +269,39 @@ where if is_zero_digest(key.borrow()) { return Ok(()); } - if force_recv { - conn.append(&final_key, &chunk[..]) - .await - .map_err(from_redis_err) - .err_tip(|| "In RedisStore::update() single chunk")?; - } + // Reader sent empty chunk, we're done here. break 'outer; } - pipe.cmd("APPEND") - .arg(temp_key.get_or_init(make_temp_name)) - .arg(&chunk[..]); - force_recv = false; + // Queue the append, but don't execute until we've received all the chunks. + pipe.append(&temp_key, chunk) + .await + .err_tip(|| "Failed to append to temp key in RedisStore::update")?; + expecting_first_chunk = false; // Give other tasks a chance to run to populate the reader's // buffer if possible. tokio::task::yield_now().await; } - pipe.query_async(&mut conn) + // Here the reader is empty but more data is expected. + // Executing the queued commands appends the data we just received to the temp key. + pipe.all() .await - .map_err(from_redis_err) - .err_tip(|| "In RedisStore::update::query_async")?; - - pipe.clear(); + .err_tip(|| "Failed to append to temporary key in RedisStore::update")?; } - pipe.cmd("RENAME") - .arg(temp_key.get_or_init(make_temp_name)) - .arg(&final_key); - - pipe.query_async(&mut conn) + // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost. + client + .rename(&temp_key, final_key.as_ref()) .await - .map_err(from_redis_err) - .err_tip(|| "In RedisStore::update")?; + .err_tip(|| "While renaming key in RedisStore::update()")?; + // If we have a publish channel configured, send a notice that the key has been set. if let Some(pub_sub_channel) = &self.pub_sub_channel { - conn.publish(pub_sub_channel, &final_key) - .await - .map_err(from_redis_err) - .err_tip(|| "Failed to publish temp key value to configured channel")? - } + client.publish(pub_sub_channel, final_key.as_ref()).await?; + }; Ok(()) } @@ -518,78 +316,77 @@ where // To follow RBE spec we need to consider any digest's with // zero size to be existing. if is_zero_digest(key.borrow()) { - writer + return writer .send_eof() - .err_tip(|| "Failed to send zero EOF in redis store get_part")?; - return Ok(()); + .err_tip(|| "Failed to send zero EOF in redis store get_part"); } - let mut conn = self.get_conn().await?; + let client = self.client_pool.next(); + let encoded_key = self.encode_key(&key); + let encoded_key = encoded_key.as_ref(); + if length == Some(0) { - let exists = conn - .exists::<_, bool>(self.encode_key(key.borrow())) + // We're supposed to read 0 bytes, so just check if the key exists. + let exists = client + .exists(encoded_key) .await - .map_err(from_redis_err) .err_tip(|| "In RedisStore::get_part::zero_exists")?; - if !exists { - return Err(make_err!( + + return match exists { + 0u8 => Err(make_err!( Code::NotFound, "Data not found in Redis store for digest: {key:?}" - )); - } - writer - .send_eof() - .err_tip(|| "Failed to write EOF in redis store get_part")?; - return Ok(()); + )), + 1 => writer + .send_eof() + .err_tip(|| "Failed to write EOF in redis store get_part"), + _ => unreachable!("only checked for existence of a single key"), + }; } - let mut current_start = isize::try_from(offset) - .err_tip(|| "Cannot convert offset to isize in RedisStore::get_part()")?; - let max_length = isize::try_from(length.unwrap_or(isize::MAX as usize)) - .err_tip(|| "Cannot convert length to isize in RedisStore::get_part()")?; - let end_position = current_start.saturating_add(max_length); + // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we + // do math with indices we change them to be exclusive at the end. + + // We want to read the data at the key from `offset` to `offset + length`. + let data_start = offset; + let data_end = data_start.saturating_add(length.unwrap_or(isize::MAX as usize)) - 1; + + // And we don't ever want to read more than `READ_CHUNK_SIZE` bytes at a time, so we'll need to iterate. + let mut chunk_start = data_start; + let mut chunk_end = cmp::min(data_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); loop { - // Note: Redis getrange is inclusive, so we need to subtract 1 from the end. - let current_end = - std::cmp::min(current_start.saturating_add(READ_CHUNK_SIZE), end_position) - 1; - let chunk = conn - .getrange::<_, Bytes>(self.encode_key(key.borrow()), current_start, current_end) + let chunk: Bytes = client + .getrange(encoded_key, chunk_start, chunk_end) .await - .map_err(from_redis_err) .err_tip(|| "In RedisStore::get_part::getrange")?; - if chunk.is_empty() { - writer + let didnt_receive_full_chunk = chunk.len() < READ_CHUNK_SIZE; + let reached_end_of_data = chunk_end == data_end; + + if didnt_receive_full_chunk || reached_end_of_data { + if !chunk.is_empty() { + writer + .send(chunk) + .await + .err_tip(|| "Failed to write data in RedisStore::get_part")?; + } + + return writer .send_eof() - .err_tip(|| "Failed to write EOF in redis store get_part")?; - break; + .err_tip(|| "Failed to write EOF in redis store get_part"); } - // Note: Redis getrange is inclusive, so we need to add 1 to the end. - let was_partial_data = chunk.len() as isize != current_end - current_start + 1; - current_start += chunk.len() as isize; + // We received a full chunk's worth of data, so write it... writer .send(chunk) .await - .err_tip(|| "Failed to write data in Redis store")?; + .err_tip(|| "Failed to write data in RedisStore::get_part")?; - // If we got partial data or the exact requested number of bytes, we are done. - if writer.get_bytes_written() as isize == max_length || was_partial_data { - writer - .send_eof() - .err_tip(|| "Failed to write EOF in redis store get_part")?; - - break; - } - - error_if!( - writer.get_bytes_written() as isize > max_length, - "Data received exceeds requested length" - ); + // ...and go grab the next chunk. + chunk_start = chunk_end + 1; + chunk_end = cmp::min(chunk_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); } - - Ok(()) } fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { @@ -610,10 +407,7 @@ where } #[async_trait] -impl HealthStatusIndicator for RedisStore -where - C: ConnectionLike + Clone + Send + Sync + Unpin + 'static, -{ +impl HealthStatusIndicator for RedisStore { fn get_name(&self) -> &'static str { "RedisStore" } @@ -622,7 +416,3 @@ where StoreDriver::check_health(Pin::new(self), namespace).await } } - -fn from_redis_err(call_res: redis::RedisError) -> Error { - make_err!(Code::Internal, "Redis Error: {call_res}") -} diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index a5c254c8a..98adce31c 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -12,25 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::thread::panicking; + use bytes::Bytes; +use fred::bytes_utils::string::Str; +use fred::error::RedisError; +use fred::mocks::{MockCommand, Mocks}; +use fred::prelude::Builder; +use fred::types::{RedisConfig, RedisValue}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS; -use nativelink_store::redis_store::{BackgroundConnection, RedisStore}; +use nativelink_store::redis_store::{RedisStore, READ_CHUNK_SIZE}; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::{StoreLike, UploadSizeInfo}; use pretty_assertions::assert_eq; -use redis::{Pipeline, RedisError}; -use redis_test::{IntoRedisValue, MockCmd, MockRedisConnection}; const VALID_HASH1: &str = "3031323334353637383961626364656630303030303030303030303030303030"; const TEMP_UUID: &str = "550e8400-e29b-41d4-a716-446655440000"; -type Command = str; -type Arg = str; -type RedisResult<'a> = Result<&'a [redis::Value], RedisError>; - fn mock_uuid_generator() -> String { uuid::Uuid::parse_str(TEMP_UUID).unwrap().to_string() } @@ -39,80 +42,149 @@ fn make_temp_key(final_name: &str) -> String { format!("temp-{TEMP_UUID}-{{{final_name}}}") } -struct MockRedisConnectionBuilder { - mock_cmds: Vec, +#[derive(Debug, Default)] +struct MockRedisBackend { + /// Commands we expect to encounter, and results we to return to the client. + // Commands are pushed from the back and popped from the front. + expected: Mutex)>>, } -impl MockRedisConnectionBuilder { +impl MockRedisBackend { fn new() -> Self { - MockRedisConnectionBuilder { mock_cmds: vec![] } + Self::default() } - fn pipe(mut self, inputs: &[(&Command, &[&Arg], RedisResult)]) -> Self { - let mut pipe = Pipeline::new(); - pipe.atomic(); - let mut res_vec = vec![]; - for (cmd, args, result) in inputs { - let mut command = redis::cmd(cmd); - for arg in args.iter() { - command.arg(arg); - } - for res in result.as_ref().unwrap().iter() { - res_vec.push(res.clone()); - } - pipe.add_command(command); - } - self.mock_cmds.push(MockCmd::with_values(pipe, Ok(res_vec))); + fn expect(&self, command: MockCommand, result: Result) -> &Self { + self.expected.lock().unwrap().push_back((command, result)); self } +} - fn cmd( - mut self, - cmd: &Command, - args: &[&Arg], - result: Result, - ) -> Self { - let mut cmd = redis::cmd(cmd); - for arg in args { - cmd.arg(arg); - } - self.mock_cmds.push(MockCmd::new(cmd, result)); - self +impl Mocks for MockRedisBackend { + fn process_command(&self, actual: MockCommand) -> Result { + let Some((expected, result)) = self.expected.lock().unwrap().pop_front() else { + // panic here -- this isn't a redis error, it's a test failure + panic!("Didn't expect any more commands, but received {actual:?}"); + }; + + assert_eq!(actual, expected); + + result } - fn build(self) -> MockRedisConnection { - MockRedisConnection::new(self.mock_cmds) + fn process_transaction(&self, commands: Vec) -> Result { + static MULTI: MockCommand = MockCommand { + cmd: Str::from_static("MULTI"), + subcommand: None, + args: Vec::new(), + }; + static EXEC: MockCommand = MockCommand { + cmd: Str::from_static("EXEC"), + subcommand: None, + args: Vec::new(), + }; + + let results = std::iter::once(MULTI.clone()) + .chain(commands) + .chain([EXEC.clone()]) + .map(|command| self.process_command(command)) + .collect::, RedisError>>()?; + + Ok(RedisValue::Array(results)) } } -type MockRedisStore = RedisStore; +impl Drop for MockRedisBackend { + fn drop(&mut self) { + if panicking() { + // We're already panicking, let's make debugging easier and let future devs solve problems one at a time. + return; + } + + let expected = self.expected.get_mut().unwrap(); + + if expected.is_empty() { + return; + } + + assert_eq!( + *expected, + VecDeque::new(), + "Didn't receive all expected commands." + ); + + // Panicking isn't enough inside a tokio task, we need to `exit(1)` + std::process::exit(1) + } +} #[nativelink_test] async fn upload_and_get_data() -> Result<(), Error> { + // Construct the data we want to send. Since it's small, we expect it to be sent in a single chunk. let data = Bytes::from_static(b"14"); + let chunk_data = RedisValue::Bytes(data.clone()); + // Construct a digest for our data and create a key based on that digest. let digest = DigestInfo::try_new(VALID_HASH1, 2)?; let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); - let temp_key = make_temp_key(&packed_hash_hex); - - let chunk_data = "14"; - - let redis_connection = MockRedisConnectionBuilder::new() - .pipe(&[("APPEND", &[&temp_key, chunk_data], Ok(&[redis::Value::Nil]))]) - .cmd("APPEND", &[&packed_hash_hex, ""], Ok("")) - .pipe(&[( - "RENAME", - &[&temp_key, &packed_hash_hex], - Ok(&[redis::Value::Nil]), - )]) - .cmd("STRLEN", &[&packed_hash_hex], Ok(redis::Value::Int(2))) - .cmd("GETRANGE", &[&packed_hash_hex, "0", "1"], Ok("14")) - .build(); - - let store = MockRedisStore::new_with_conn_and_name_generator( - BackgroundConnection::with_initializer(async move { Ok::<_, Error>(redis_connection) }), - mock_uuid_generator, - ); + + // Construct our Redis store with a mocked out backend. + let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); + let real_key = RedisValue::Bytes(packed_hash_hex.into()); + + let mocks = Arc::new(MockRedisBackend::new()); + + // The first set of commands are for setting the data. + mocks + // Append the real value to the temp key. + .expect( + MockCommand { + cmd: Str::from_static("APPEND"), + subcommand: None, + args: vec![temp_key.clone(), chunk_data], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), + ) + // Move the data from the fake key to the real key. + .expect( + MockCommand { + cmd: Str::from_static("RENAME"), + subcommand: None, + args: vec![temp_key, real_key.clone()], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), + ); + + // The second set of commands are for retrieving the data from the key. + mocks + // Check that the key exists. + .expect( + MockCommand { + cmd: Str::from_static("STRLEN"), + subcommand: None, + args: vec![real_key.clone()], + }, + Ok(RedisValue::Integer(2)), + ) + // Retrieve the data from the real key. + .expect( + MockCommand { + cmd: Str::from_static("GETRANGE"), + subcommand: None, + args: vec![real_key, RedisValue::Integer(0), RedisValue::Integer(1)], + }, + Ok(RedisValue::String(Str::from_static("14"))), + ); + + let store = { + let mut builder = Builder::default_centralized(); + builder.set_config(RedisConfig { + mocks: Some(Arc::clone(&mocks) as Arc), + ..Default::default() + }); + + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + }; store.update_oneshot(digest, data.clone()).await?; @@ -123,7 +195,7 @@ async fn upload_and_get_data() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.clone().len())) + .get_part_unchunked(digest, 0, Some(data.len())) .await?; assert_eq!(result, data, "Expected redis store to have updated value",); @@ -134,32 +206,65 @@ async fn upload_and_get_data() -> Result<(), Error> { #[nativelink_test] async fn upload_and_get_data_with_prefix() -> Result<(), Error> { let data = Bytes::from_static(b"14"); + let chunk_data = RedisValue::Bytes(data.clone()); + let prefix = "TEST_PREFIX-"; let digest = DigestInfo::try_new(VALID_HASH1, 2)?; let packed_hash_hex = format!("{prefix}{}-{}", digest.hash_str(), digest.size_bytes); - let temp_key = make_temp_key(&packed_hash_hex); - - let chunk_data = "14"; - - let redis_connection = MockRedisConnectionBuilder::new() - .pipe(&[("APPEND", &[&temp_key, chunk_data], Ok(&[redis::Value::Nil]))]) - .cmd("APPEND", &[&packed_hash_hex, ""], Ok("")) - .pipe(&[( - "RENAME", - &[&temp_key, &packed_hash_hex], - Ok(&[redis::Value::Nil]), - )]) - .cmd("STRLEN", &[&packed_hash_hex], Ok(redis::Value::Int(2))) - .cmd("GETRANGE", &[&packed_hash_hex, "0", "1"], Ok("14")) - .build(); - - let store = MockRedisStore::new_with_conn_and_name_generator_and_prefix( - BackgroundConnection::with_initializer(async move { Ok::<_, Error>(redis_connection) }), - mock_uuid_generator, - None, - prefix.to_string(), - ); + + let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); + let real_key = RedisValue::Bytes(packed_hash_hex.into()); + + let mocks = Arc::new(MockRedisBackend::new()); + mocks + .expect( + MockCommand { + cmd: Str::from_static("APPEND"), + subcommand: None, + args: vec![temp_key.clone(), chunk_data], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), + ) + .expect( + MockCommand { + cmd: Str::from_static("RENAME"), + subcommand: None, + args: vec![temp_key, real_key.clone()], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), + ) + .expect( + MockCommand { + cmd: Str::from_static("STRLEN"), + subcommand: None, + args: vec![real_key.clone()], + }, + Ok(RedisValue::Integer(2)), + ) + .expect( + MockCommand { + cmd: Str::from_static("GETRANGE"), + subcommand: None, + args: vec![real_key, RedisValue::Integer(0), RedisValue::Integer(1)], + }, + Ok(RedisValue::String(Str::from_static("14"))), + ); + + let store = { + let mut builder = Builder::default_centralized(); + builder.set_config(RedisConfig { + mocks: Some(Arc::clone(&mocks) as Arc), + ..Default::default() + }); + + RedisStore::new_from_builder_and_parts( + builder, + None, + mock_uuid_generator, + prefix.to_string(), + )? + }; store.update_oneshot(digest, data.clone()).await?; @@ -170,7 +275,7 @@ async fn upload_and_get_data_with_prefix() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.clone().len())) + .get_part_unchunked(digest, 0, Some(data.len())) .await?; assert_eq!(result, data, "Expected redis store to have updated value",); @@ -181,15 +286,15 @@ async fn upload_and_get_data_with_prefix() -> Result<(), Error> { #[nativelink_test] async fn upload_empty_data() -> Result<(), Error> { let data = Bytes::from_static(b""); - let digest = ZERO_BYTE_DIGESTS[0]; - let redis_connection = MockRedisConnectionBuilder::new().build(); - - let store = MockRedisStore::new_with_conn_and_name_generator( - BackgroundConnection::with_initializer(async move { Ok::<_, Error>(redis_connection) }), + // We expect to skip both uploading and downloading when the digest is known zero. + let store = RedisStore::new_from_builder_and_parts( + Builder::default_centralized(), + None, mock_uuid_generator, - ); + String::new(), + )?; store.update_oneshot(digest, data).await?; @@ -205,18 +310,15 @@ async fn upload_empty_data() -> Result<(), Error> { #[nativelink_test] async fn upload_empty_data_with_prefix() -> Result<(), Error> { let data = Bytes::from_static(b""); - let prefix = "TEST_PREFIX-"; - let digest = ZERO_BYTE_DIGESTS[0]; + let prefix = "TEST_PREFIX-"; - let redis_connection = MockRedisConnectionBuilder::new().build(); - - let store = MockRedisStore::new_with_conn_and_name_generator_and_prefix( - BackgroundConnection::with_initializer(async move { Ok::<_, Error>(redis_connection) }), - mock_uuid_generator, + let store = RedisStore::new_from_builder_and_parts( + Builder::default_centralized(), None, + mock_uuid_generator, prefix.to_string(), - ); + )?; store.update_oneshot(digest, data).await?; @@ -230,49 +332,80 @@ async fn upload_empty_data_with_prefix() -> Result<(), Error> { } #[nativelink_test] -async fn test_uploading_large_data() -> Result<(), Error> { - // Requires multiple chunks as data is larger than 64K - let data: Bytes = Bytes::from(vec![0u8; 65 * 1024]); +async fn test_large_downloads_are_chunked() -> Result<(), Error> { + // Requires multiple chunks as data is larger than 64K. + let data = Bytes::from(vec![0u8; READ_CHUNK_SIZE + 128]); let digest = DigestInfo::try_new(VALID_HASH1, 1)?; let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); - let temp_key = make_temp_key(&packed_hash_hex); - - let chunk_data = std::str::from_utf8(&data).unwrap().to_string(); - - let redis_connection = MockRedisConnectionBuilder::new() - .pipe(&[( - "APPEND", - &[&temp_key, &chunk_data], - Ok(&[redis::Value::Nil]), - )]) - .cmd( - "APPEND", - &[&packed_hash_hex, ""], - Ok(hex::encode(&data[..]).as_str()), + + let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); + let real_key = RedisValue::Bytes(packed_hash_hex.into()); + + let mocks = Arc::new(MockRedisBackend::new()); + + mocks + .expect( + MockCommand { + cmd: Str::from_static("APPEND"), + subcommand: None, + args: vec![temp_key.clone(), data.clone().into()], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), ) - .pipe(&[( - "RENAME", - &[&temp_key, &packed_hash_hex], - Ok(&[redis::Value::Nil]), - )]) - .cmd("STRLEN", &[&packed_hash_hex], Ok(redis::Value::Int(2))) - .cmd( - "GETRANGE", - &[&packed_hash_hex, "0", "65535"], - Ok(hex::encode(&data[..]).as_str()), + .expect( + MockCommand { + cmd: Str::from_static("RENAME"), + subcommand: None, + args: vec![temp_key, real_key.clone()], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), ) - .cmd( - "GETRANGE", - &[&packed_hash_hex, "65535", "65560"], - Ok(hex::encode(&data[..]).as_str()), + .expect( + MockCommand { + cmd: Str::from_static("STRLEN"), + subcommand: None, + args: vec![real_key.clone()], + }, + Ok(RedisValue::Integer(data.len().try_into().unwrap())), ) - .build(); - - let store = MockRedisStore::new_with_conn_and_name_generator( - BackgroundConnection::with_initializer(async move { Ok::<_, Error>(redis_connection) }), - mock_uuid_generator, - ); + .expect( + MockCommand { + cmd: Str::from_static("GETRANGE"), + subcommand: None, + args: vec![ + real_key.clone(), + RedisValue::Integer(0), + // We expect to be asked for data from `0..READ_CHUNK_SIZE`, but since GETRANGE is inclusive + // the actual call should be from `0..=(READ_CHUNK_SIZE - 1)`. + RedisValue::Integer(READ_CHUNK_SIZE as i64 - 1), + ], + }, + Ok(RedisValue::Bytes(data.slice(..READ_CHUNK_SIZE))), + ) + .expect( + MockCommand { + cmd: Str::from_static("GETRANGE"), + subcommand: None, + args: vec![ + real_key, + RedisValue::Integer(READ_CHUNK_SIZE as i64), + // Similar GETRANCE index shenanigans here. + RedisValue::Integer(data.len() as i64 - 1), + ], + }, + Ok(RedisValue::Bytes(data.slice(READ_CHUNK_SIZE..))), + ); + + let store = { + let mut builder = Builder::default_centralized(); + builder.set_config(RedisConfig { + mocks: Some(Arc::clone(&mocks) as Arc), + ..Default::default() + }); + + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + }; store.update_oneshot(digest, data.clone()).await?; @@ -287,8 +420,8 @@ async fn test_uploading_large_data() -> Result<(), Error> { .await?; assert_eq!( - hex::encode(get_result).len(), - hex::encode(data.clone()).len(), + get_result, + data.clone(), "Expected redis store to have updated value", ); @@ -303,39 +436,64 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { let digest = DigestInfo::try_new(VALID_HASH1, 2)?; let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); - let temp_key = make_temp_key(&packed_hash_hex); - - let redis_connection = MockRedisConnectionBuilder::new() - .pipe(&[ - ( - "APPEND", - &[&temp_key, std::str::from_utf8(&data_p1).unwrap()], - Ok(&[redis::Value::Nil]), - ), - ( - "APPEND", - &[&temp_key, std::str::from_utf8(&data_p2).unwrap()], - Ok(&[redis::Value::Nil]), - ), - ]) - .cmd("APPEND", &[&packed_hash_hex, ""], Ok("")) - .pipe(&[( - "RENAME", - &[&temp_key, &packed_hash_hex], - Ok(&[redis::Value::Nil]), - )]) - .cmd("STRLEN", &[&packed_hash_hex], Ok(redis::Value::Int(2))) - .cmd( - "GETRANGE", - &[&packed_hash_hex, "0", "10239"], - Ok(std::str::from_utf8(&data).unwrap()), - ) - .build(); - let store = MockRedisStore::new_with_conn_and_name_generator( - BackgroundConnection::with_initializer(async move { Ok::<_, Error>(redis_connection) }), - mock_uuid_generator, - ); + let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); + let real_key = RedisValue::Bytes(packed_hash_hex.into()); + + let mocks = Arc::new(MockRedisBackend::new()); + mocks + // We expect multiple `"APPEND"`s as we send data in multiple chunks + .expect( + MockCommand { + cmd: Str::from_static("APPEND"), + subcommand: None, + args: vec![temp_key.clone(), data_p1.clone().into()], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), + ) + .expect( + MockCommand { + cmd: Str::from_static("APPEND"), + subcommand: None, + args: vec![temp_key.clone(), data_p2.clone().into()], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), + ) + // The rest of the process looks the same. + .expect( + MockCommand { + cmd: Str::from_static("RENAME"), + subcommand: None, + args: vec![temp_key, real_key.clone()], + }, + Ok(RedisValue::Array(vec![RedisValue::Null])), + ) + .expect( + MockCommand { + cmd: Str::from_static("STRLEN"), + subcommand: None, + args: vec![real_key.clone()], + }, + Ok(RedisValue::Integer(2)), + ) + .expect( + MockCommand { + cmd: Str::from_static("GETRANGE"), + subcommand: None, + args: vec![real_key, RedisValue::Integer(0), RedisValue::Integer(10239)], + }, + Ok(RedisValue::Bytes(data.clone())), + ); + + let store = { + let mut builder = Builder::default_centralized(); + builder.set_config(RedisConfig { + mocks: Some(Arc::clone(&mocks) as Arc), + ..Default::default() + }); + + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + }; let (mut tx, rx) = make_buf_channel_pair(); tx.send(data_p1).await?;