Skip to content

Commit

Permalink
Remove wildcard searching in redis scheduler
Browse files Browse the repository at this point in the history
Redis does not use a b-tree under the hood, instead it aggregates the
key space, then does a union on all documents in the database. Thus,
redis has a limit on how many items are unioned (MAXEXPANSIONS); by
default redis has this set to 200. This limitation causes issues if
there ever becomes more than 200 scheduled tasks, so we get around this
by making the sort and filter index separate. Now we search explicitly
for exact match state, then sort on a different key.
  • Loading branch information
allada committed Oct 22, 2024
1 parent a815ba0 commit 7609aec
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 53 deletions.
32 changes: 17 additions & 15 deletions nativelink-scheduler/src/store_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl SchedulerIndexProvider for SearchUniqueQualifierToAwaitedAction<'_> {
const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
const INDEX_NAME: &'static str = "unique_qualifier";
type Versioned = TrueValue;
fn index_value_prefix(&self) -> Cow<'_, str> {
fn index_value(&self) -> Cow<'_, str> {
Cow::Owned(format!("{}", self.0))
}
}
Expand All @@ -204,16 +204,17 @@ impl SchedulerStoreDecodeTo for SearchUniqueQualifierToAwaitedAction<'_> {
}
}

struct SearchSortKeyPrefixToAwaitedAction(&'static str);
impl SchedulerIndexProvider for SearchSortKeyPrefixToAwaitedAction {
struct SearchStateToAwaitedAction(&'static str);
impl SchedulerIndexProvider for SearchStateToAwaitedAction {
const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
const INDEX_NAME: &'static str = "sort_key";
const INDEX_NAME: &'static str = "state";
const MAYBE_SORT_KEY: Option<&'static str> = Some("sort_key");
type Versioned = TrueValue;
fn index_value_prefix(&self) -> Cow<'_, str> {
fn index_value(&self) -> Cow<'_, str> {
Cow::Borrowed(self.0)
}
}
impl SchedulerStoreDecodeTo for SearchSortKeyPrefixToAwaitedAction {
impl SchedulerStoreDecodeTo for SearchStateToAwaitedAction {
type DecodeOutput = AwaitedAction;
fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
awaited_action_decode(version, &data)
Expand All @@ -222,10 +223,10 @@ impl SchedulerStoreDecodeTo for SearchSortKeyPrefixToAwaitedAction {

fn get_state_prefix(state: SortedAwaitedActionState) -> &'static str {
match state {
SortedAwaitedActionState::CacheCheck => "x_",
SortedAwaitedActionState::Queued => "q_",
SortedAwaitedActionState::Executing => "e_",
SortedAwaitedActionState::Completed => "c_",
SortedAwaitedActionState::CacheCheck => "cache_check",
SortedAwaitedActionState::Queued => "queued",
SortedAwaitedActionState::Executing => "executing",
SortedAwaitedActionState::Completed => "completed",
}
}

Expand Down Expand Up @@ -263,14 +264,15 @@ impl SchedulerStoreDataProvider for UpdateOperationIdToAwaitedAction {
{
let state = SortedAwaitedActionState::try_from(&self.0.state().stage)
.err_tip(|| "In UpdateOperationIdToAwaitedAction::get_index")?;
output.push(("state", Bytes::from(get_state_prefix(state))));
let sorted_awaited_action = SortedAwaitedAction::from(&self.0);
output.push((
"sort_key",
// We encode to hex to ensure that the sort key is lexicographically sorted.
Bytes::from(format!(
"{}{}",
"{}{:016x}",
get_state_prefix(state),
sorted_awaited_action.sort_key.as_u64(),
)),
sorted_awaited_action.sort_key.as_u64())),
));
}
Ok(output)
Expand Down Expand Up @@ -534,7 +536,7 @@ impl<S: SchedulerStore, F: Fn() -> OperationId + Send + Sync + Unpin + 'static>
}
Ok(self
.store
.search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction(get_state_prefix(state)))
.search_by_index_prefix(SearchStateToAwaitedAction(get_state_prefix(state)))
.await
.err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
.map_ok(move |awaited_action| {
Expand All @@ -552,7 +554,7 @@ impl<S: SchedulerStore, F: Fn() -> OperationId + Send + Sync + Unpin + 'static>
) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> {
Ok(self
.store
.search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction(""))
.search_by_index_prefix(SearchStateToAwaitedAction(""))
.await
.err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
.map_ok(move |awaited_action| {
Expand Down
19 changes: 10 additions & 9 deletions nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,14 @@ async fn add_action_smoke_test() -> Result<(), Error> {

const SUB_CHANNEL: &str = "sub_channel";
let ft_aggregate_args = vec![
format!("aa__unique_qualifier_{SCRIPT_VERSION}").into(),
format!("@unique_qualifier:{{ {INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c* }}").into(),
format!("aa__unique_qualifier__{SCRIPT_VERSION}").into(),
format!("@unique_qualifier:{{ {INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c }}").into(),
"LOAD".into(),
2.into(),
"data".into(),
"version".into(),
"SORTBY".into(),
2.into(),
"@unique_qualifier".into(),
"ASC".into(),
0.into(),
"WITHCURSOR".into(),
"COUNT".into(),
256.into(),
Expand Down Expand Up @@ -239,7 +237,7 @@ async fn add_action_smoke_test() -> Result<(), Error> {
cmd: Str::from_static("FT.CREATE"),
subcommand: None,
args: vec![
format!("aa__unique_qualifier_{SCRIPT_VERSION}").into(),
format!("aa__unique_qualifier__{SCRIPT_VERSION}").into(),
"ON".into(),
"HASH".into(),
"PREFIX".into(),
Expand All @@ -254,7 +252,6 @@ async fn add_action_smoke_test() -> Result<(), Error> {
"SCHEMA".into(),
"unique_qualifier".into(),
"TAG".into(),
"SORTABLE".into(),
],
},
Ok(RedisValue::Bytes(Bytes::from("data"))),
Expand Down Expand Up @@ -286,8 +283,10 @@ async fn add_action_smoke_test() -> Result<(), Error> {
RedisValue::Bytes(Bytes::from(serde_json::to_string(&worker_awaited_action).unwrap())),
"unique_qualifier".as_bytes().into(),
format!("{INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c").as_bytes().into(),
"state".as_bytes().into(),
"queued".as_bytes().into(),
"sort_key".as_bytes().into(),
"q_9223372041149743103".as_bytes().into(),
"80000000ffffffff".as_bytes().into(),
],
},
Ok(1.into() /* New version */),
Expand Down Expand Up @@ -360,8 +359,10 @@ async fn add_action_smoke_test() -> Result<(), Error> {
RedisValue::Bytes(Bytes::from(serde_json::to_string(&new_awaited_action).unwrap())),
"unique_qualifier".as_bytes().into(),
format!("{INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c").as_bytes().into(),
"state".as_bytes().into(),
"executing".as_bytes().into(),
"sort_key".as_bytes().into(),
"e_9223372041149743103".as_bytes().into(),
"80000000ffffffff".as_bytes().into(),
],
},
Ok(2.into() /* New version */),
Expand Down
75 changes: 47 additions & 28 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,12 @@ const fn fingerprint_create_index_template() -> u32 {
/// This will add some prefix data to the name to try and ensure
/// if the index definition changes, the name will get a new name.
macro_rules! get_index_name {
($prefix:expr, $field:expr) => {
($prefix:expr, $field:expr, $maybe_sort:expr) => {
format_args!(
"{}_{}_{:08x}",
"{}_{}_{}_{:08x}",
$prefix,
$field,
$maybe_sort.unwrap_or(""),
fingerprint_create_index_template(),
)
};
Expand Down Expand Up @@ -1026,19 +1027,20 @@ impl SchedulerStore for RedisStore {
where
K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
{
let index_value_prefix = index.index_value_prefix();
let index_value = index.index_value();
let run_ft_aggregate = || {
let client = self.client_pool.next().clone();
let sanitized_field = try_sanitize(index_value_prefix.as_ref()).err_tip(|| {
format!(
"In RedisStore::search_by_index_prefix::try_sanitize - {index_value_prefix:?}"
)
let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| {
format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
})?;
Ok::<_, Error>(async move {
ft_aggregate(
client,
format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)),
format!("@{}:{{ {}* }}", K::INDEX_NAME, sanitized_field),
format!(
"{}",
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
),
format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
fred::types::FtAggregateOptions {
load: Some(fred::types::Load::Some(vec![
fred::types::SearchField {
Expand All @@ -1055,10 +1057,9 @@ impl SchedulerStore for RedisStore {
max_idle: Some(CURSOR_IDLE_MS),
}),
pipeline: vec![fred::types::AggregateOperation::SortBy {
properties: vec![(
format!("@{}", K::INDEX_NAME).into(),
fred::types::SortOrder::Asc,
)],
properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {
vec![(format!("@{v}").into(), fred::types::SortOrder::Asc)]
}),
max: None,
}],
..Default::default()
Expand All @@ -1069,11 +1070,40 @@ impl SchedulerStore for RedisStore {
};
let stream = run_ft_aggregate()?
.or_else(|_| async move {
let mut schema = vec![SearchSchema {
field_name: K::INDEX_NAME.into(),
alias: None,
kind: SearchSchemaKind::Tag {
sortable: false,
unf: false,
separator: None,
casesensitive: false,
withsuffixtrie: false,
noindex: false,
},
}];
if let Some(sort_key) = K::MAYBE_SORT_KEY {
schema.push(SearchSchema {
field_name: sort_key.into(),
alias: None,
kind: SearchSchemaKind::Tag {
sortable: true,
unf: false,
separator: None,
casesensitive: false,
withsuffixtrie: false,
noindex: false,
},
});
}
let create_result = self
.client_pool
.next()
.ft_create::<(), _>(
format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)),
format!(
"{}",
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
),
FtCreateOptions {
on: Some(fred::types::IndexKind::Hash),
prefixes: vec![K::KEY_PREFIX.into()],
Expand All @@ -1084,30 +1114,19 @@ impl SchedulerStore for RedisStore {
temporary: Some(INDEX_TTL_S),
..Default::default()
},
vec![SearchSchema {
field_name: K::INDEX_NAME.into(),
alias: None,
kind: SearchSchemaKind::Tag {
sortable: true,
unf: false,
separator: None,
casesensitive: false,
withsuffixtrie: false,
noindex: false,
},
}],
schema,
)
.await
.err_tip(|| {
format!(
"Error with ft_create in RedisStore::search_by_index_prefix({})",
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME),
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
)
});
let run_result = run_ft_aggregate()?.await.err_tip(|| {
format!(
"Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME),
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
)
});
// Creating the index will race which is ok. If it fails to create, we only
Expand Down
5 changes: 4 additions & 1 deletion nativelink-util/src/store_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,11 +855,14 @@ pub trait SchedulerIndexProvider {
/// The name of the index.
const INDEX_NAME: &'static str;

/// The sort key for the index (if any).
const MAYBE_SORT_KEY: Option<&'static str> = None;

/// If the data is versioned.
type Versioned: BoolValue;

/// The value of the index.
fn index_value_prefix(&self) -> Cow<'_, str>;
fn index_value(&self) -> Cow<'_, str>;
}

/// Provides a key to lookup data in the store.
Expand Down

0 comments on commit 7609aec

Please sign in to comment.