From 482fc9f19d7657152be5a5cda000d279db02e67a Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Mon, 9 Sep 2024 16:17:43 -0500 Subject: [PATCH] Add StoreAwaitedActionDb API This is pre-work for the introducing a distributed redis scheduler. This is the API for how to interact between the scheduler and the stores. towards: #359 --- nativelink-scheduler/BUILD.bazel | 1 + .../src/awaited_action_db/awaited_action.rs | 4 + nativelink-scheduler/src/lib.rs | 1 + .../src/store_awaited_action_db.rs | 561 ++++++++++++++++++ nativelink-util/src/action_messages.rs | 30 + nativelink-util/src/store_trait.rs | 136 ++++- 6 files changed, 732 insertions(+), 1 deletion(-) create mode 100644 nativelink-scheduler/src/store_awaited_action_db.rs diff --git a/nativelink-scheduler/BUILD.bazel b/nativelink-scheduler/BUILD.bazel index c448f6d241..d312ec568b 100644 --- a/nativelink-scheduler/BUILD.bazel +++ b/nativelink-scheduler/BUILD.bazel @@ -19,6 +19,7 @@ rust_library( "src/memory_awaited_action_db.rs", "src/platform_property_manager.rs", "src/property_modifier_scheduler.rs", + "src/store_awaited_action_db.rs", "src/simple_scheduler.rs", "src/simple_scheduler_state_manager.rs", "src/worker.rs", diff --git a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs index 5b65e2322e..ba25c0cd55 100644 --- a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs +++ b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs @@ -112,6 +112,10 @@ impl AwaitedAction { self.version.0 } + pub(crate) fn set_version(&mut self, version: u64) { + self.version = AwaitedActionVersion(version); + } + pub(crate) fn increment_version(&mut self) { self.version = AwaitedActionVersion(self.version.0 + 1); } diff --git a/nativelink-scheduler/src/lib.rs b/nativelink-scheduler/src/lib.rs index 5d47b49600..9638cd30e3 100644 --- a/nativelink-scheduler/src/lib.rs +++ b/nativelink-scheduler/src/lib.rs @@ -22,5 +22,6 @@ pub mod platform_property_manager; pub mod property_modifier_scheduler; pub mod simple_scheduler; mod simple_scheduler_state_manager; +pub mod store_awaited_action_db; pub mod worker; pub mod worker_scheduler; diff --git a/nativelink-scheduler/src/store_awaited_action_db.rs b/nativelink-scheduler/src/store_awaited_action_db.rs new file mode 100644 index 0000000000..4ed1cb72c7 --- /dev/null +++ b/nativelink-scheduler/src/store_awaited_action_db.rs @@ -0,0 +1,561 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::ops::Bound; +use std::sync::{Arc, Weak}; +use std::time::{Duration, SystemTime}; + +use bytes::Bytes; +use futures::{Stream, TryStreamExt}; +use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; +use nativelink_metric::MetricsComponent; +use nativelink_util::action_messages::{ + ActionInfo, ActionStage, ActionUniqueQualifier, OperationId, +}; +use nativelink_util::spawn; +use nativelink_util::store_trait::{ + FalseValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore, + SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, + SchedulerSubscription, SchedulerSubscriptionManager, StoreKey, TrueValue, +}; +use nativelink_util::task::JoinHandleDropGuard; +use tokio::sync::Notify; +use tracing::{event, Level}; + +use crate::awaited_action_db::{ + AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedAction, + SortedAwaitedActionState, +}; + +type ClientOperationId = OperationId; + +enum OperationSubscriberState { + Unsubscribed, + Subscribed(Sub), +} + +pub struct OperationSubscriber { + maybe_client_operation_id: Option, + subscription_key: OperationIdToAwaitedAction<'static>, + weak_store: Weak, + state: OperationSubscriberState< + ::Subscription, + >, + now_fn: fn() -> SystemTime, +} +impl OperationSubscriber { + fn new( + maybe_client_operation_id: Option, + subscription_key: OperationIdToAwaitedAction<'static>, + weak_store: Weak, + now_fn: fn() -> SystemTime, + ) -> Self { + Self { + maybe_client_operation_id, + subscription_key, + weak_store, + state: OperationSubscriberState::Unsubscribed, + now_fn, + } + } + + async fn get_awaited_action(&self) -> Result { + let store = self + .weak_store + .upgrade() + .err_tip(|| "Store gone in OperationSubscriber::get_awaited_action")?; + let key = self.subscription_key.borrow(); + let mut awaited_action = store + .get_and_decode(key.borrow()) + .await + .err_tip(|| format!("In OperationSubscriber::get_awaited_action {key:?}"))? + .ok_or_else(|| { + make_err!( + Code::NotFound, + "Could not find AwaitedAction for the given operation id {key:?}", + ) + })?; + if let Some(client_operation_id) = &self.maybe_client_operation_id { + let mut state = awaited_action.state().as_ref().clone(); + state.client_operation_id = client_operation_id.clone(); + awaited_action.set_state(Arc::new(state), Some((self.now_fn)())); + } + Ok(awaited_action) + } +} + +impl AwaitedActionSubscriber for OperationSubscriber { + async fn changed(&mut self) -> Result { + let store = self + .weak_store + .upgrade() + .err_tip(|| "Store gone in OperationSubscriber::get_awaited_action")?; + let subscription = match &mut self.state { + OperationSubscriberState::Unsubscribed => { + let subscription = store + .subscription_manager() + .err_tip(|| "In OperationSubscriber::changed::subscription_manager")? + .subscribe(self.subscription_key.borrow()) + .err_tip(|| "In OperationSubscriber::changed::subscribe")?; + self.state = OperationSubscriberState::Subscribed(subscription); + let OperationSubscriberState::Subscribed(subscription) = &mut self.state else { + unreachable!("Subscription should be in Subscribed state"); + }; + subscription + } + OperationSubscriberState::Subscribed(subscription) => subscription, + }; + subscription + .changed() + .await + .err_tip(|| "In OperationSubscriber::changed")?; + self.get_awaited_action() + .await + .err_tip(|| "In OperationSubscriber::changed") + } + + async fn borrow(&self) -> Result { + self.get_awaited_action() + .await + .err_tip(|| "In OperationSubscriber::borrow") + } +} + +fn awaited_action_decode(version: u64, data: Bytes) -> Result { + let mut awaited_action: AwaitedAction = serde_json::from_slice(&data) + .map_err(|e| make_input_err!("In AwaitedAction::decode - {e:?}"))?; + awaited_action.set_version(version); + Ok(awaited_action) +} + +const OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX: &str = "aa_"; +const CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX: &str = "cid_"; + +#[derive(Debug)] +struct OperationIdToAwaitedAction<'a>(Cow<'a, OperationId>); +impl OperationIdToAwaitedAction<'_> { + fn borrow(&self) -> OperationIdToAwaitedAction<'_> { + match self.0 { + Cow::Borrowed(operation_id) => OperationIdToAwaitedAction(Cow::Borrowed(operation_id)), + Cow::Owned(ref operation_id) => OperationIdToAwaitedAction(Cow::Borrowed(operation_id)), + } + } +} +impl SchedulerStoreKeyProvider for OperationIdToAwaitedAction<'_> { + type Versioned = TrueValue; + fn get_key(&self) -> StoreKey<'static> { + StoreKey::Str(Cow::Owned(format!( + "{OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX}{}", + self.0 + ))) + } +} +impl SchedulerStoreDecodeTo for OperationIdToAwaitedAction<'_> { + type DecodeOutput = AwaitedAction; + fn decode(version: u64, data: Bytes) -> Result { + awaited_action_decode(version, data) + } +} + +struct ClientIdToOperationId<'a>(&'a OperationId); +impl SchedulerStoreKeyProvider for ClientIdToOperationId<'_> { + type Versioned = FalseValue; + fn get_key(&self) -> StoreKey<'static> { + StoreKey::Str(Cow::Owned(format!( + "{CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX}{}", + self.0 + ))) + } +} +impl SchedulerStoreDecodeTo for ClientIdToOperationId<'_> { + type DecodeOutput = OperationId; + fn decode(_version: u64, data: Bytes) -> Result { + OperationId::try_from(data).err_tip(|| "In ClientIdToOperationId::decode") + } +} + +// TODO(allada) We only need operation_id here, it would be nice if we had a way +// to tell the decoder we only care about specific fields. +struct SearchUniqueQualifierToAwaitedAction<'a>(&'a ActionUniqueQualifier); +impl SchedulerIndexProvider for SearchUniqueQualifierToAwaitedAction<'_> { + const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX; + const INDEX_NAME: &'static str = "unique_qualifier"; + type Versioned = TrueValue; + fn index_value_prefix(&self) -> Cow<'_, str> { + Cow::Owned(format!("{}", self.0)) + } +} +impl SchedulerStoreDecodeTo for SearchUniqueQualifierToAwaitedAction<'_> { + type DecodeOutput = AwaitedAction; + fn decode(version: u64, data: Bytes) -> Result { + awaited_action_decode(version, data) + } +} + +struct SearchSortKeyPrefixToAwaitedAction(&'static str); +impl SchedulerIndexProvider for SearchSortKeyPrefixToAwaitedAction { + const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX; + const INDEX_NAME: &'static str = "sort_key"; + type Versioned = TrueValue; + fn index_value_prefix(&self) -> Cow<'_, str> { + Cow::Borrowed(self.0) + } +} +impl SchedulerStoreDecodeTo for SearchSortKeyPrefixToAwaitedAction { + type DecodeOutput = AwaitedAction; + fn decode(version: u64, data: Bytes) -> Result { + awaited_action_decode(version, data) + } +} + +fn get_state_prefix(state: &SortedAwaitedActionState) -> &'static str { + match state { + SortedAwaitedActionState::CacheCheck => "x_", + SortedAwaitedActionState::Queued => "q_", + SortedAwaitedActionState::Executing => "e_", + SortedAwaitedActionState::Completed => "c_", + } +} + +struct UpdateOperationIdToAwaitedAction(AwaitedAction); +impl SchedulerCurrentVersionProvider for UpdateOperationIdToAwaitedAction { + fn current_version(&self) -> u64 { + self.0.version() + } +} +impl SchedulerStoreKeyProvider for UpdateOperationIdToAwaitedAction { + type Versioned = TrueValue; + fn get_key(&self) -> StoreKey<'static> { + OperationIdToAwaitedAction(Cow::Borrowed(self.0.operation_id())).get_key() + } +} +impl SchedulerStoreDataProvider for UpdateOperationIdToAwaitedAction { + fn try_into_bytes(self) -> Result { + serde_json::to_string(&self.0) + .map(Bytes::from) + .map_err(|e| make_input_err!("Could not convert AwaitedAction to json - {e:?}")) + } + fn get_indexes(&self) -> Result, Error> { + let unique_qualifier = &self.0.action_info().unique_qualifier; + let maybe_unique_qualifier = match &unique_qualifier { + ActionUniqueQualifier::Cachable(_) => Some(unique_qualifier), + ActionUniqueQualifier::Uncachable(_) => None, + }; + let mut output = Vec::with_capacity(1 + maybe_unique_qualifier.map_or(0, |_| 1)); + if maybe_unique_qualifier.is_some() { + output.push(( + "unique_qualifier", + Bytes::from(unique_qualifier.to_string()), + )) + } + { + let state = SortedAwaitedActionState::try_from(&self.0.state().stage) + .err_tip(|| "In UpdateOperationIdToAwaitedAction::get_index")?; + let sorted_awaited_action = SortedAwaitedAction::from(&self.0); + output.push(( + "sort_key", + Bytes::from(format!( + "{}{}", + get_state_prefix(&state), + sorted_awaited_action.sort_key.as_u64(), + )), + )); + } + Ok(output) + } +} + +struct UpdateClientIdToOperationId { + client_operation_id: ClientOperationId, + operation_id: OperationId, +} +impl SchedulerStoreKeyProvider for UpdateClientIdToOperationId { + type Versioned = FalseValue; + fn get_key(&self) -> StoreKey<'static> { + ClientIdToOperationId(&self.client_operation_id).get_key() + } +} +impl SchedulerStoreDataProvider for UpdateClientIdToOperationId { + fn try_into_bytes(self) -> Result { + serde_json::to_string(&self.operation_id) + .map(Bytes::from) + .map_err(|e| make_input_err!("Could not convert OperationId to json - {e:?}")) + } +} + +#[derive(MetricsComponent)] +pub struct StoreAwaitedActionDb { + store: Arc, + now_fn: fn() -> SystemTime, + _pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>, +} + +impl StoreAwaitedActionDb { + pub fn new( + store: Arc, + task_change_publisher: Arc, + now_fn: fn() -> SystemTime, + ) -> Result { + let mut subscription = store + .subscription_manager() + .err_tip(|| "In RedisAwaitedActionDb::new")? + .subscribe(OperationIdToAwaitedAction(Cow::Owned(OperationId::String( + String::new(), + )))) + .err_tip(|| "In RedisAwaitedActionDb::new")?; + let pull_task_change_subscriber = spawn!( + "redis_awaited_action_db_pull_task_change_subscriber", + async move { + loop { + let changed_res = subscription + .changed() + .await + .err_tip(|| "In RedisAwaitedActionDb::new"); + if let Err(err) = changed_res { + event!( + Level::ERROR, + "Error waiting for pull task change subscriber in RedisAwaitedActionDb::new - {err:?}" + ); + // Sleep for a second to avoid a busy loop, then trigger the notify + // so if a reconnect happens we let local resources know that things + // might have changed. + tokio::time::sleep(Duration::from_secs(1)).await; + } + task_change_publisher.as_ref().notify_one(); + } + } + ); + Ok(Self { + store, + now_fn, + _pull_task_change_subscriber_spawn: pull_task_change_subscriber, + }) + } + + async fn try_subscribe( + &self, + client_operation_id: &ClientOperationId, + unique_qualifier: &ActionUniqueQualifier, + // TODO(allada) To simplify the scheduler 2024 refactor, we + // removed the ability to upgrade priorities of actions. + // we should add priority upgrades back in. + _priority: i32, + ) -> Result>, Error> { + match unique_qualifier { + ActionUniqueQualifier::Cachable(_) => {} + ActionUniqueQualifier::Uncachable(_) => return Ok(None), + } + let stream = self + .store + .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier)) + .await + .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + tokio::pin!(stream); + let maybe_awaited_action = stream + .try_next() + .await + .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + match maybe_awaited_action { + Some(awaited_action) => { + // TODO(allada) We don't support joining completed jobs because we + // need to also check that all the data is still in the cache. + if awaited_action.state().stage.is_finished() { + return Ok(None); + } + // TODO(allada) We only care about the operation_id here, we should + // have a way to tell the decoder we only care about specific fields. + let operation_id = awaited_action.operation_id(); + Ok(Some(OperationSubscriber::new( + Some(client_operation_id.clone()), + OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())), + Arc::downgrade(&self.store), + self.now_fn, + ))) + } + None => Ok(None), + } + } + + async fn inner_get_awaited_action_by_id( + &self, + client_operation_id: &ClientOperationId, + ) -> Result>, Error> { + let maybe_operation_id = self + .store + .get_and_decode(ClientIdToOperationId(client_operation_id)) + .await + .err_tip(|| "In RedisAwaitedActionDb::get_awaited_action_by_id")?; + let Some(operation_id) = maybe_operation_id else { + return Ok(None); + }; + Ok(Some(OperationSubscriber::new( + Some(client_operation_id.clone()), + OperationIdToAwaitedAction(Cow::Owned(operation_id)), + Arc::downgrade(&self.store), + self.now_fn, + ))) + } +} + +impl AwaitedActionDb for StoreAwaitedActionDb { + type Subscriber = OperationSubscriber; + + async fn get_awaited_action_by_id( + &self, + client_operation_id: &ClientOperationId, + ) -> Result, Error> { + self.inner_get_awaited_action_by_id(client_operation_id) + .await + } + + async fn get_by_operation_id( + &self, + operation_id: &OperationId, + ) -> Result, Error> { + Ok(Some(OperationSubscriber::new( + None, + OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())), + Arc::downgrade(&self.store), + self.now_fn, + ))) + } + + async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> { + let operation_id = new_awaited_action.operation_id().clone(); + let maybe_version = self + .store + .update_data(UpdateOperationIdToAwaitedAction(new_awaited_action)) + .await + .err_tip(|| "In RedisAwaitedActionDb::update_awaited_action")?; + if maybe_version.is_none() { + return Err(make_err!( + Code::Aborted, + "Could not update AwaitedAction because the version did not match for {operation_id:?}", + )); + } + Ok(()) + } + + async fn add_action( + &self, + client_operation_id: ClientOperationId, + action_info: Arc, + ) -> Result { + // Check to see if the action is already known and subscribe if it is. + let subscription = self + .try_subscribe( + &client_operation_id, + &action_info.unique_qualifier, + action_info.priority, + ) + .await + .err_tip(|| "In RedisAwaitedActionDb::add_action")?; + if let Some(sub) = subscription { + return Ok(sub); + } + + let new_operation_id = OperationId::default(); + let awaited_action = AwaitedAction::new(new_operation_id.clone(), action_info); + debug_assert!( + ActionStage::Queued == awaited_action.state().stage, + "Expected action to be queued" + ); + + // Note: Version is not needed with this api. + let _version = self + .store + .update_data(UpdateOperationIdToAwaitedAction(awaited_action)) + .await + .err_tip(|| "In RedisAwaitedActionDb::add_action")? + .err_tip(|| { + "Version match failed for new action insert in RedisAwaitedActionDb::add_action" + })?; + + self.store + .update_data(UpdateClientIdToOperationId { + client_operation_id: client_operation_id.clone(), + operation_id: new_operation_id.clone(), + }) + .await + .err_tip(|| "In RedisAwaitedActionDb::add_action")?; + + Ok(OperationSubscriber::new( + Some(client_operation_id), + OperationIdToAwaitedAction(Cow::Owned(new_operation_id)), + Arc::downgrade(&self.store), + self.now_fn, + )) + } + + async fn get_range_of_actions( + &self, + state: SortedAwaitedActionState, + start: Bound, + end: Bound, + desc: bool, + ) -> Result> + Send, Error> { + if !matches!(start, Bound::Unbounded) { + return Err(make_err!( + Code::Unimplemented, + "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions", + )); + } + if !matches!(end, Bound::Unbounded) { + return Err(make_err!( + Code::Unimplemented, + "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions", + )); + } + // TODO(allada) This API is not difficult to implement, but there is no code path + // that uses it, so no reason to implement it yet. + if !desc { + return Err(make_err!( + Code::Unimplemented, + "Descending order is not supported in RedisAwaitedActionDb::get_range_of_actions", + )); + } + Ok(self + .store + .search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction(get_state_prefix(&state))) + .await + .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")? + .map_ok(move |awaited_action| { + OperationSubscriber::new( + None, + OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())), + Arc::downgrade(&self.store), + self.now_fn, + ) + })) + } + + async fn get_all_awaited_actions( + &self, + ) -> Result>, Error> { + Ok(self + .store + .search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction("")) + .await + .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")? + .map_ok(move |awaited_action| { + OperationSubscriber::new( + None, + OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())), + Arc::downgrade(&self.store), + self.now_fn, + ) + })) + } +} diff --git a/nativelink-util/src/action_messages.rs b/nativelink-util/src/action_messages.rs index d389c17c55..046007c76c 100644 --- a/nativelink-util/src/action_messages.rs +++ b/nativelink-util/src/action_messages.rs @@ -108,6 +108,36 @@ impl From for OperationId { } } +impl TryFrom for OperationId { + type Error = Error; + + fn try_from(value: Bytes) -> Result { + // This is an optimized path to attempt to do the conversion in-place + // to avoid an extra allocation/copy. + match value.try_into_mut() { + // We are the only reference to the Bytes, so we can convert it into a Vec + // for free then convert the Vec to a String for free too. + Ok(value) => { + let value = String::from_utf8(value.into()).map_err(|e| { + make_input_err!( + "Failed to convert bytes to string in try_from for OperationId : {e:?}" + ) + })?; + Ok(Self::from(value)) + } + // We could not take ownership of the Bytes, so we may need to copy our data. + Err(value) => { + let value = std::str::from_utf8(&value).map_err(|e| { + make_input_err!( + "Failed to convert bytes to string in try_from for OperationId : {e:?}" + ) + })?; + Ok(Self::from(value)) + } + } + } +} + /// Unique id of worker. #[derive(Default, Eq, PartialEq, Hash, Copy, Clone, Serialize, Deserialize)] pub struct WorkerId(pub Uuid); diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 55d221c01a..e986b532d2 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -23,7 +23,7 @@ use std::sync::{Arc, OnceLock}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use futures::future::{select, Either}; -use futures::{join, try_join, Future, FutureExt}; +use futures::{join, try_join, Future, FutureExt, Stream}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use rand::rngs::StdRng; @@ -834,3 +834,137 @@ pub trait StoreDriver: // Register health checks used to monitor the store. fn register_health(self: Arc, _registry: &mut HealthRegistryBuilder) {} } + +/// The instructions on how to decode a value from a Bytes & version into +/// the underlying type. +pub trait SchedulerStoreDecodeTo { + type DecodeOutput; + fn decode(version: u64, data: Bytes) -> Result; +} + +pub trait SchedulerSubscription: Send + Sync { + fn changed(&mut self) -> impl Future> + Send; +} + +pub trait SchedulerSubscriptionManager: Send + Sync { + type Subscription: SchedulerSubscription; + + fn subscribe(&self, key: K) -> Result + where + K: SchedulerStoreKeyProvider; +} + +/// The API surface for a scheduler store. +pub trait SchedulerStore: Send + Sync + 'static { + type SubscriptionManager: SchedulerSubscriptionManager; + + /// Returns the subscription manager for the scheduler store. + fn subscription_manager(&self) -> Result, Error>; + + /// Updates or inserts an entry into the underlying store. + /// Metadata about the key is attached to the compile-time type. + /// If StoreKeyProvider::Versioned is TrueValue, the data will not + /// be updated if the current version in the database does not match + /// the version in the passed in data. + /// No guarantees are made about when Version is FalseValue. + /// Indexes are guaranteed to be updated atomically with the data. + fn update_data(&self, data: T) -> impl Future, Error>> + Send + where + T: SchedulerStoreDataProvider + + SchedulerStoreKeyProvider + + SchedulerCurrentVersionProvider + + Send; + + /// Searches for all keys in the store that match the given index prefix. + fn search_by_index_prefix( + &self, + index: K, + ) -> impl Future< + Output = Result< + impl Stream::DecodeOutput, Error>> + Send, + Error, + >, + > + Send + where + K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send; + + /// Returns data for the provided key with the given version if + /// StoreKeyProvider::Versioned is TrueValue. + fn get_and_decode( + &self, + key: K, + ) -> impl Future::DecodeOutput>, Error>> + Send + where + K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send; +} + +/// A type that is used to let the scheduler store know what +/// index is beign requested. +pub trait SchedulerIndexProvider { + /// Only keys inserted with this prefix will be indexed. + const KEY_PREFIX: &'static str; + + /// The name of the index. + const INDEX_NAME: &'static str; + + /// If the data is versioned. + type Versioned: BoolValue; + + /// The value of the index. + fn index_value_prefix(&self) -> Cow<'_, str>; +} + +/// Provides a key to lookup data in the store. +pub trait SchedulerStoreKeyProvider { + /// If the data is versioned. + type Versioned: BoolValue; + + /// Returns the key for the data. + fn get_key(&self) -> StoreKey<'static>; +} + +/// Provides data to be stored in the scheduler store. +pub trait SchedulerStoreDataProvider { + /// Converts the data into bytes to be stored in the store. + fn try_into_bytes(self) -> Result; + + /// Returns the indexes for the data if any. + fn get_indexes(&self) -> Result, Error> { + Ok(Vec::new()) + } +} + +/// Provides the current version of the data in the store. +pub trait SchedulerCurrentVersionProvider { + /// Returns the current version of the data in the store. + fn current_version(&self) -> u64; +} + +/// Default implementation for when we are not providing a version +/// for the data. +impl SchedulerCurrentVersionProvider for T +where + T: SchedulerStoreKeyProvider, +{ + fn current_version(&self) -> u64 { + 0 + } +} + +pub trait BoolValue { + const VALUE: bool; +} +pub trait IsFalse {} +pub trait IsTrue {} + +pub struct TrueValue; +impl BoolValue for TrueValue { + const VALUE: bool = true; +} +impl IsTrue for TrueValue {} + +pub struct FalseValue; +impl BoolValue for FalseValue { + const VALUE: bool = false; +} +impl IsFalse for FalseValue {}