Skip to content

Commit

Permalink
refactor(app): state module
Browse files Browse the repository at this point in the history
  • Loading branch information
snormore committed Aug 23, 2024
1 parent 49241a8 commit 4da63d3
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 156 deletions.
6 changes: 3 additions & 3 deletions core/application/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use lightning_interfaces::types::{ChainId, NodeInfo};
use tracing::{error, info};

use crate::config::{Config, StorageConfig};
use crate::env::{Env, UpdateWorker};
use crate::query_runner::QueryRunner;
use crate::env::{ApplicationEnv, Env, UpdateWorker};
use crate::state::QueryRunner;
pub struct Application<C: Collection> {
update_socket: Mutex<Option<ExecutionEngineSocket>>,
query_runner: QueryRunner,
Expand Down Expand Up @@ -102,7 +102,7 @@ impl<C: Collection> ApplicationInterface<C> for Application<C> {
let mut counter = 0;

loop {
match Env::new(config, Some((checkpoint_hash, &checkpoint))) {
match ApplicationEnv::new(config, Some((checkpoint_hash, &checkpoint))) {
Ok(mut env) => {
info!(
"Successfully built database from checkpoint with hash {checkpoint_hash:?}"
Expand Down
101 changes: 20 additions & 81 deletions core/application/src/env.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::collections::BTreeSet;
use std::path::Path;
use std::time::Duration;

use affair::AsyncWorker as WorkerTrait;
use anyhow::{Context, Result};
use atomo::{Atomo, AtomoBuilder, DefaultSerdeBackend, QueryPerm, StorageBackend, UpdatePerm};
use atomo::{AtomoBuilder, DefaultSerdeBackend, SerdeBackend, StorageBackend};
use atomo_rocks::{Cache as RocksCache, Env as RocksEnv, Options};
use fleek_crypto::{ClientPublicKey, ConsensusPublicKey, EthAddress, NodePublicKey};
use hp_fixed::unsigned::HpUfixed;
use lightning_interfaces::prelude::*;
use lightning_interfaces::types::{
AccountInfo,
Blake3Hash,
Block,
BlockExecutionResponse,
Committee,
Expand All @@ -24,31 +22,28 @@ use lightning_interfaces::types::{
NodeInfo,
NodeServed,
ProtocolParams,
ReportedReputationMeasurements,
Service,
ServiceId,
ServiceRevenue,
TotalServed,
TransactionReceipt,
TransactionResponse,
TxHash,
Value,
};
use lightning_metrics::increment_counter;
use tracing::warn;

use crate::config::{Config, StorageConfig};
use crate::genesis::GenesisPrices;
use crate::query_runner::QueryRunner;
use crate::state::State;
use crate::state::{ApplicationState, QueryRunner};
use crate::storage::{AtomoStorage, AtomoStorageBuilder};
use crate::table::StateTables;

pub struct Env<P, B: StorageBackend> {
pub inner: Atomo<P, B>,
pub struct Env<B: StorageBackend, S: SerdeBackend> {
pub inner: ApplicationState<B, S>,
}

impl Env<UpdatePerm, AtomoStorage> {
pub type ApplicationEnv = Env<AtomoStorage, DefaultSerdeBackend>;

impl ApplicationEnv {
pub fn new(config: &Config, checkpoint: Option<([u8; 32], &[u8])>) -> Result<Self> {
let storage = match config.storage {
StorageConfig::RocksDb => {
Expand Down Expand Up @@ -83,60 +78,17 @@ impl Env<UpdatePerm, AtomoStorage> {
StorageConfig::InMemory => AtomoStorageBuilder::new::<&Path>(None),
};

let mut atomo = AtomoBuilder::<AtomoStorageBuilder, DefaultSerdeBackend>::new(storage);
atomo = atomo
.with_table::<Metadata, Value>("metadata")
.with_table::<EthAddress, AccountInfo>("account")
.with_table::<ClientPublicKey, EthAddress>("client_keys")
.with_table::<NodeIndex, NodeInfo>("node")
.with_table::<ConsensusPublicKey, NodeIndex>("consensus_key_to_index")
.with_table::<NodePublicKey, NodeIndex>("pub_key_to_index")
.with_table::<(NodeIndex, NodeIndex), Duration>("latencies")
.with_table::<Epoch, Committee>("committee")
.with_table::<ServiceId, Service>("service")
.with_table::<ProtocolParams, u128>("parameter")
.with_table::<NodeIndex, Vec<ReportedReputationMeasurements>>("rep_measurements")
.with_table::<NodeIndex, u8>("rep_scores")
.with_table::<NodeIndex, u8>("submitted_rep_measurements")
.with_table::<NodeIndex, NodeServed>("current_epoch_served")
.with_table::<NodeIndex, NodeServed>("last_epoch_served")
.with_table::<Epoch, TotalServed>("total_served")
.with_table::<CommodityTypes, HpUfixed<6>>("commodity_prices")
.with_table::<ServiceId, ServiceRevenue>("service_revenue")
.with_table::<TxHash, ()>("executed_digests")
.with_table::<NodeIndex, u8>("uptime")
.with_table::<Blake3Hash, BTreeSet<NodeIndex>>("uri_to_node")
.with_table::<NodeIndex, BTreeSet<Blake3Hash>>("node_to_uri")
.enable_iter("current_epoch_served")
.enable_iter("rep_measurements")
.enable_iter("submitted_rep_measurements")
.enable_iter("rep_scores")
.enable_iter("latencies")
.enable_iter("node")
.enable_iter("executed_digests")
.enable_iter("uptime")
.enable_iter("service_revenue")
.enable_iter("uri_to_node")
.enable_iter("node_to_uri");

#[cfg(debug_assertions)]
{
atomo = atomo
.enable_iter("consensus_key_to_index")
.enable_iter("pub_key_to_index");
}
let atomo = AtomoBuilder::<AtomoStorageBuilder, DefaultSerdeBackend>::new(storage);

Ok(Self {
inner: atomo.build()?,
inner: ApplicationState::build(atomo)?,
})
}

pub fn query_runner(&self) -> QueryRunner {
QueryRunner::new(self.inner.query())
self.inner.query()
}
}

impl<B: StorageBackend> Env<UpdatePerm, B> {
#[autometrics::autometrics]
async fn run<F, P>(&mut self, mut block: Block, get_putter: F) -> BlockExecutionResponse
where
Expand All @@ -145,10 +97,7 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
{
let response = self.inner.run(move |ctx| {
// Create the app/execution environment
let backend = StateTables {
table_selector: ctx,
};
let app = State::new(backend);
let app = ApplicationState::executor(ctx);
let last_block_hash = app.get_block_hash();

let block_number = app.get_block_number() + 1;
Expand Down Expand Up @@ -199,14 +148,14 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
response.txn_receipts.push(receipt);
}

// Set the last executed block hash and sub dag index
// if epoch changed a new committee starts and subdag starts back at 0
let (new_sub_dag_index, new_sub_dag_round) = if response.change_epoch {
(0, 0)
} else {
(block.sub_dag_index, block.sub_dag_round)
};
// Set the last executed block hash and sub dag index
app.set_last_block(block.digest, new_sub_dag_index, new_sub_dag_round);
app.set_last_block(response.block_hash, new_sub_dag_index, new_sub_dag_round);

// Return the response
response
Expand Down Expand Up @@ -243,13 +192,6 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
response
}

/// Returns an identical environment but with query permissions
pub fn query_socket(&self) -> Env<QueryPerm, B> {
Env {
inner: self.inner.query(),
}
}

/// Tries to seeds the application state with the genesis block
/// This function will panic if the genesis file cannot be decoded into the correct types
/// Will return true if database was empty and genesis needed to be loaded or false if there was
Expand Down Expand Up @@ -461,37 +403,34 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
}
}

metadata_table.insert(Metadata::Epoch, Value::Epoch(0));
Ok(true)
metadata_table.insert(Metadata::Epoch, Value::Epoch(0));
Ok(true)
})
}

// Should only be called after saving or loading from an epoch checkpoint
pub fn update_last_epoch_hash(&mut self, state_hash: [u8; 32]) {
self.inner.run(move |ctx| {
let backend = StateTables {
table_selector: ctx,
};
let app = State::new(backend);
let app = ApplicationState::executor(ctx);
app.set_last_epoch_hash(state_hash);
})
}
}

impl Default for Env<UpdatePerm, AtomoStorage> {
impl Default for ApplicationEnv {
fn default() -> Self {
Self::new(&Config::default(), None).unwrap()
}
}

/// The socket that receives all update transactions
pub struct UpdateWorker<C: Collection> {
env: Env<UpdatePerm, AtomoStorage>,
env: ApplicationEnv,
blockstore: C::BlockstoreInterface,
}

impl<C: Collection> UpdateWorker<C> {
pub fn new(env: Env<UpdatePerm, AtomoStorage>, blockstore: C::BlockstoreInterface) -> Self {
pub fn new(env: ApplicationEnv, blockstore: C::BlockstoreInterface) -> Self {
Self { env, blockstore }
}
}
Expand All @@ -518,7 +457,7 @@ mod env_tests {
.write_to_dir(temp_dir.path().to_path_buf().try_into().unwrap())
.unwrap();
let config = Config::test(genesis_path);
let mut env = Env::<_, AtomoStorage>::new(&config, None).unwrap();
let mut env = ApplicationEnv::new(&config, None).unwrap();

assert!(env.apply_genesis_block(&config).unwrap());

Expand Down
4 changes: 1 addition & 3 deletions core/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ pub mod config;
pub mod env;
pub mod genesis;
pub mod network;
pub mod query_runner;
pub mod state;
pub(crate) mod storage;
pub mod table;
pub mod storage;
#[cfg(test)]
mod tests;
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ pub trait TableRef<K, V> {
fn remove(&self, key: &K);
}

pub struct StateTables<'selector, B: StorageBackend, S: SerdeBackend> {
pub struct StateContext<'selector, B: StorageBackend, S: SerdeBackend> {
pub table_selector: &'selector TableSelector<B, S>,
}

impl<'selector, B: StorageBackend, S: SerdeBackend> Backend for StateTables<'selector, B, S> {
impl<'selector, B: StorageBackend, S: SerdeBackend> Backend for StateContext<'selector, B, S> {
type Ref<
K: Eq + Hash + Send + Serialize + DeserializeOwned + 'static,
V: Clone + Send + Serialize + DeserializeOwned + 'static,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;

use crate::table::{Backend, TableRef};
use super::context::{Backend, TableRef};

/// Reported measurements are weighted by the reputation score of the reporting node.
/// If there is no reputation score for the reporting node, we use a quantile from the array
Expand Down Expand Up @@ -96,11 +96,11 @@ lazy_static! {
static ref BIG_HUNDRED: HpUfixed<18> = HpUfixed::<18>::from(100_u64);
}

/// The state of the Application
/// The state executor encapsuates the logic for executing transactions.
///
/// The functions implemented on this struct are the "Smart Contracts" of the application layer
/// All state changes come from Transactions and start at execute_transaction
pub struct State<B: Backend> {
pub struct StateExecutor<B: Backend> {
pub metadata: B::Ref<Metadata, Value>,
pub account_info: B::Ref<EthAddress, AccountInfo>,
pub client_keys: B::Ref<ClientPublicKey, EthAddress>,
Expand All @@ -126,7 +126,7 @@ pub struct State<B: Backend> {
pub backend: B,
}

impl<B: Backend> State<B> {
impl<B: Backend> StateExecutor<B> {
pub fn new(backend: B) -> Self {
Self {
metadata: backend.get_table_reference("metadata"),
Expand Down Expand Up @@ -155,6 +155,7 @@ impl<B: Backend> State<B> {
}
}

/// Executes a generic transaction.
pub fn execute_transaction(&self, txn: TransactionRequest) -> TransactionResponse {
let hash = txn.hash();
let (sender, response) = match txn {
Expand All @@ -173,9 +174,8 @@ impl<B: Backend> State<B> {
response
}

/// This function is the entry point of a transaction
/// Executes a fleek transaction.
fn execute_fleek_transaction(&self, txn: UpdateRequest) -> TransactionResponse {
// Execute transaction
let response = match txn.payload.method {
UpdateMethod::SubmitDeliveryAcknowledgmentAggregation {
commodity,
Expand Down Expand Up @@ -274,6 +274,7 @@ impl<B: Backend> State<B> {
response
}

/// Executes an ethereum transaction.
fn execute_ethereum_transaction(&self, txn: EthersTransaction) -> TransactionResponse {
let to_address = match txn.to {
Some(address) => address,
Expand Down
7 changes: 7 additions & 0 deletions core/application/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod context;
mod executor;
mod query;
mod writer;

pub use query::QueryRunner;
pub use writer::ApplicationState;
Loading

0 comments on commit 4da63d3

Please sign in to comment.