From 7609aec24bf2388a995316d987ab2b6b53f51b5b Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Fri, 11 Oct 2024 13:01:53 -0500 Subject: [PATCH] Remove wildcard searching in redis scheduler Redis does not use a b-tree under the hood, instead it aggregates the key space, then does a union on all documents in the database. Thus, redis has a limit on how many items are unioned (MAXEXPANSIONS); by default redis has this set to 200. This limitation causes issues if there ever becomes more than 200 scheduled tasks, so we get around this by making the sort and filter index separate. Now we search explicitly for exact match state, then sort on a different key. --- .../src/store_awaited_action_db.rs | 32 ++++---- .../redis_store_awaited_action_db_test.rs | 19 ++--- nativelink-store/src/redis_store.rs | 75 ++++++++++++------- nativelink-util/src/store_trait.rs | 5 +- 4 files changed, 78 insertions(+), 53 deletions(-) diff --git a/nativelink-scheduler/src/store_awaited_action_db.rs b/nativelink-scheduler/src/store_awaited_action_db.rs index b6ef92b3b..88708bcee 100644 --- a/nativelink-scheduler/src/store_awaited_action_db.rs +++ b/nativelink-scheduler/src/store_awaited_action_db.rs @@ -193,7 +193,7 @@ impl SchedulerIndexProvider for SearchUniqueQualifierToAwaitedAction<'_> { const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX; const INDEX_NAME: &'static str = "unique_qualifier"; type Versioned = TrueValue; - fn index_value_prefix(&self) -> Cow<'_, str> { + fn index_value(&self) -> Cow<'_, str> { Cow::Owned(format!("{}", self.0)) } } @@ -204,16 +204,17 @@ impl SchedulerStoreDecodeTo for SearchUniqueQualifierToAwaitedAction<'_> { } } -struct SearchSortKeyPrefixToAwaitedAction(&'static str); -impl SchedulerIndexProvider for SearchSortKeyPrefixToAwaitedAction { +struct SearchStateToAwaitedAction(&'static str); +impl SchedulerIndexProvider for SearchStateToAwaitedAction { const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX; - const INDEX_NAME: &'static str = "sort_key"; + const INDEX_NAME: &'static str = "state"; + const MAYBE_SORT_KEY: Option<&'static str> = Some("sort_key"); type Versioned = TrueValue; - fn index_value_prefix(&self) -> Cow<'_, str> { + fn index_value(&self) -> Cow<'_, str> { Cow::Borrowed(self.0) } } -impl SchedulerStoreDecodeTo for SearchSortKeyPrefixToAwaitedAction { +impl SchedulerStoreDecodeTo for SearchStateToAwaitedAction { type DecodeOutput = AwaitedAction; fn decode(version: u64, data: Bytes) -> Result { awaited_action_decode(version, &data) @@ -222,10 +223,10 @@ impl SchedulerStoreDecodeTo for SearchSortKeyPrefixToAwaitedAction { fn get_state_prefix(state: SortedAwaitedActionState) -> &'static str { match state { - SortedAwaitedActionState::CacheCheck => "x_", - SortedAwaitedActionState::Queued => "q_", - SortedAwaitedActionState::Executing => "e_", - SortedAwaitedActionState::Completed => "c_", + SortedAwaitedActionState::CacheCheck => "cache_check", + SortedAwaitedActionState::Queued => "queued", + SortedAwaitedActionState::Executing => "executing", + SortedAwaitedActionState::Completed => "completed", } } @@ -263,14 +264,15 @@ impl SchedulerStoreDataProvider for UpdateOperationIdToAwaitedAction { { let state = SortedAwaitedActionState::try_from(&self.0.state().stage) .err_tip(|| "In UpdateOperationIdToAwaitedAction::get_index")?; + output.push(("state", Bytes::from(get_state_prefix(state)))); let sorted_awaited_action = SortedAwaitedAction::from(&self.0); output.push(( "sort_key", + // We encode to hex to ensure that the sort key is lexicographically sorted. Bytes::from(format!( - "{}{}", + "{}{:016x}", get_state_prefix(state), - sorted_awaited_action.sort_key.as_u64(), - )), + sorted_awaited_action.sort_key.as_u64())), )); } Ok(output) @@ -534,7 +536,7 @@ impl OperationId + Send + Sync + Unpin + 'static> } Ok(self .store - .search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction(get_state_prefix(state))) + .search_by_index_prefix(SearchStateToAwaitedAction(get_state_prefix(state))) .await .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")? .map_ok(move |awaited_action| { @@ -552,7 +554,7 @@ impl OperationId + Send + Sync + Unpin + 'static> ) -> Result>, Error> { Ok(self .store - .search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction("")) + .search_by_index_prefix(SearchStateToAwaitedAction("")) .await .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")? .map_ok(move |awaited_action| { diff --git a/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs b/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs index 00709cb2e..c7f761c70 100644 --- a/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs +++ b/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs @@ -194,16 +194,14 @@ async fn add_action_smoke_test() -> Result<(), Error> { const SUB_CHANNEL: &str = "sub_channel"; let ft_aggregate_args = vec![ - format!("aa__unique_qualifier_{SCRIPT_VERSION}").into(), - format!("@unique_qualifier:{{ {INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c* }}").into(), + format!("aa__unique_qualifier__{SCRIPT_VERSION}").into(), + format!("@unique_qualifier:{{ {INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c }}").into(), "LOAD".into(), 2.into(), "data".into(), "version".into(), "SORTBY".into(), - 2.into(), - "@unique_qualifier".into(), - "ASC".into(), + 0.into(), "WITHCURSOR".into(), "COUNT".into(), 256.into(), @@ -239,7 +237,7 @@ async fn add_action_smoke_test() -> Result<(), Error> { cmd: Str::from_static("FT.CREATE"), subcommand: None, args: vec![ - format!("aa__unique_qualifier_{SCRIPT_VERSION}").into(), + format!("aa__unique_qualifier__{SCRIPT_VERSION}").into(), "ON".into(), "HASH".into(), "PREFIX".into(), @@ -254,7 +252,6 @@ async fn add_action_smoke_test() -> Result<(), Error> { "SCHEMA".into(), "unique_qualifier".into(), "TAG".into(), - "SORTABLE".into(), ], }, Ok(RedisValue::Bytes(Bytes::from("data"))), @@ -286,8 +283,10 @@ async fn add_action_smoke_test() -> Result<(), Error> { RedisValue::Bytes(Bytes::from(serde_json::to_string(&worker_awaited_action).unwrap())), "unique_qualifier".as_bytes().into(), format!("{INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c").as_bytes().into(), + "state".as_bytes().into(), + "queued".as_bytes().into(), "sort_key".as_bytes().into(), - "q_9223372041149743103".as_bytes().into(), + "80000000ffffffff".as_bytes().into(), ], }, Ok(1.into() /* New version */), @@ -360,8 +359,10 @@ async fn add_action_smoke_test() -> Result<(), Error> { RedisValue::Bytes(Bytes::from(serde_json::to_string(&new_awaited_action).unwrap())), "unique_qualifier".as_bytes().into(), format!("{INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c").as_bytes().into(), + "state".as_bytes().into(), + "executing".as_bytes().into(), "sort_key".as_bytes().into(), - "e_9223372041149743103".as_bytes().into(), + "80000000ffffffff".as_bytes().into(), ], }, Ok(2.into() /* New version */), diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 80db12341..8b337fc38 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -681,11 +681,12 @@ const fn fingerprint_create_index_template() -> u32 { /// This will add some prefix data to the name to try and ensure /// if the index definition changes, the name will get a new name. macro_rules! get_index_name { - ($prefix:expr, $field:expr) => { + ($prefix:expr, $field:expr, $maybe_sort:expr) => { format_args!( - "{}_{}_{:08x}", + "{}_{}_{}_{:08x}", $prefix, $field, + $maybe_sort.unwrap_or(""), fingerprint_create_index_template(), ) }; @@ -1026,19 +1027,20 @@ impl SchedulerStore for RedisStore { where K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send, { - let index_value_prefix = index.index_value_prefix(); + let index_value = index.index_value(); let run_ft_aggregate = || { let client = self.client_pool.next().clone(); - let sanitized_field = try_sanitize(index_value_prefix.as_ref()).err_tip(|| { - format!( - "In RedisStore::search_by_index_prefix::try_sanitize - {index_value_prefix:?}" - ) + let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| { + format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}") })?; Ok::<_, Error>(async move { ft_aggregate( client, - format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)), - format!("@{}:{{ {}* }}", K::INDEX_NAME, sanitized_field), + format!( + "{}", + get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY) + ), + format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field), fred::types::FtAggregateOptions { load: Some(fred::types::Load::Some(vec![ fred::types::SearchField { @@ -1055,10 +1057,9 @@ impl SchedulerStore for RedisStore { max_idle: Some(CURSOR_IDLE_MS), }), pipeline: vec![fred::types::AggregateOperation::SortBy { - properties: vec![( - format!("@{}", K::INDEX_NAME).into(), - fred::types::SortOrder::Asc, - )], + properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| { + vec![(format!("@{v}").into(), fred::types::SortOrder::Asc)] + }), max: None, }], ..Default::default() @@ -1069,11 +1070,40 @@ impl SchedulerStore for RedisStore { }; let stream = run_ft_aggregate()? .or_else(|_| async move { + let mut schema = vec![SearchSchema { + field_name: K::INDEX_NAME.into(), + alias: None, + kind: SearchSchemaKind::Tag { + sortable: false, + unf: false, + separator: None, + casesensitive: false, + withsuffixtrie: false, + noindex: false, + }, + }]; + if let Some(sort_key) = K::MAYBE_SORT_KEY { + schema.push(SearchSchema { + field_name: sort_key.into(), + alias: None, + kind: SearchSchemaKind::Tag { + sortable: true, + unf: false, + separator: None, + casesensitive: false, + withsuffixtrie: false, + noindex: false, + }, + }); + } let create_result = self .client_pool .next() .ft_create::<(), _>( - format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)), + format!( + "{}", + get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY) + ), FtCreateOptions { on: Some(fred::types::IndexKind::Hash), prefixes: vec![K::KEY_PREFIX.into()], @@ -1084,30 +1114,19 @@ impl SchedulerStore for RedisStore { temporary: Some(INDEX_TTL_S), ..Default::default() }, - vec![SearchSchema { - field_name: K::INDEX_NAME.into(), - alias: None, - kind: SearchSchemaKind::Tag { - sortable: true, - unf: false, - separator: None, - casesensitive: false, - withsuffixtrie: false, - noindex: false, - }, - }], + schema, ) .await .err_tip(|| { format!( "Error with ft_create in RedisStore::search_by_index_prefix({})", - get_index_name!(K::KEY_PREFIX, K::INDEX_NAME), + get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), ) }); let run_result = run_ft_aggregate()?.await.err_tip(|| { format!( "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})", - get_index_name!(K::KEY_PREFIX, K::INDEX_NAME), + get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), ) }); // Creating the index will race which is ok. If it fails to create, we only diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 2c4f69154..63c3671ca 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -855,11 +855,14 @@ pub trait SchedulerIndexProvider { /// The name of the index. const INDEX_NAME: &'static str; + /// The sort key for the index (if any). + const MAYBE_SORT_KEY: Option<&'static str> = None; + /// If the data is versioned. type Versioned: BoolValue; /// The value of the index. - fn index_value_prefix(&self) -> Cow<'_, str>; + fn index_value(&self) -> Cow<'_, str>; } /// Provides a key to lookup data in the store.