Skip to content

Commit

Permalink
🧪 passing, pause is working as intended ✨
Browse files Browse the repository at this point in the history
  • Loading branch information
0xJepsen committed Aug 16, 2023
1 parent 293dcd3 commit 5cd959f
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 241 deletions.
44 changes: 26 additions & 18 deletions arbiter-core/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,16 +291,21 @@ impl Environment {

// Await for the condvar alert to change the state
State::Paused => {
// this logic here ensures we catch the last transaction and send the appropriate error so that we dont hang in limbo forever
if let Ok((_, _, sender)) = tx_receiver.recv(){
let error_outcome = TransactionOutcome::Error(EnvironmentError::Pause {
cause: "Environment is paused".into(),
});
// this logic here ensures we catch the last transaction and send the
// appropriate error so that we dont hang in limbo forever
// loop till tx_receiver is empty
if let Ok((_, _, sender)) = tx_receiver.recv() {
let error_outcome =
TransactionOutcome::Error(EnvironmentError::Pause {
cause: "Environment is paused".into(),
});
let revm_result = RevmResult {
outcome: error_outcome,
block_number: convert_uint_to_u64(evm.env.block.number).map_err(|e| EnvironmentError::Conversion {
cause: format!("{:?}", e),
})?,
block_number: convert_uint_to_u64(evm.env.block.number).map_err(
|e| EnvironmentError::Conversion {
cause: format!("{:?}", e),
},
)?,
};
sender.send(revm_result).unwrap();
}
Expand Down Expand Up @@ -460,26 +465,29 @@ pub(crate) struct Socket {
pub(crate) event_broadcaster: Arc<Mutex<EventBroadcaster>>,
}


/// Represents the possible outcomes of an EVM transaction.
///
/// This enum is used to encapsulate both successful transaction results and potential errors.
/// - `Success`: Indicates that the transaction was executed successfully and contains the
/// result of the execution. The wrapped `ExecutionResult` provides detailed information about
/// the transaction's execution, such as returned values or changes made to the state.
/// - `Error`: Indicates that the transaction failed due to some error condition. The wrapped
/// `EnvironmentError` provides specifics about the error, allowing callers to take appropriate
/// action or relay more informative error messages.
/// This enum is used to encapsulate both successful transaction results and
/// potential errors.
/// - `Success`: Indicates that the transaction was executed successfully and
/// contains the result of the execution. The wrapped `ExecutionResult`
/// provides detailed information about the transaction's execution, such as
/// returned values or changes made to the state.
/// - `Error`: Indicates that the transaction failed due to some error
/// condition. The wrapped `EnvironmentError` provides specifics about the
/// error, allowing callers to take appropriate action or relay more
/// informative error messages.
#[derive(Debug, Clone)]
pub(crate) enum TransactionOutcome {
/// Represents a successfully executed transaction.
///
/// Contains the result of the transaction's execution.
Success(ExecutionResult),

/// Represents a failed transaction due to some error.
///
/// Contains information about the error that caused the transaction failure.
/// Contains information about the error that caused the transaction
/// failure.
Error(EnvironmentError),
}
/// Represents the result of an EVM transaction.
Expand Down
161 changes: 82 additions & 79 deletions arbiter-core/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,30 +188,31 @@ impl Manager {
environment_label: S,
) -> Result<(), ManagerError> {
match self.environments.get_mut(&environment_label.clone().into()) {
Some(environment) => match environment.state.load(std::sync::atomic::Ordering::SeqCst)
{
State::Initialization => {
environment.run();
info!("Started environment labeled {}", environment_label.into());
Ok(())
}
State::Paused => {
environment
.state
.store(State::Running, std::sync::atomic::Ordering::SeqCst);
let (lock, pausevar) = &*environment.pausevar;
let _guard = lock.lock().unwrap();
pausevar.notify_all();
info!("Restarted environment labeled {}", environment_label.into());
Ok(())
Some(environment) => {
match environment.state.load(std::sync::atomic::Ordering::SeqCst) {
State::Initialization => {
environment.run();
info!("Started environment labeled {}", environment_label.into());
Ok(())
}
State::Paused => {
environment
.state
.store(State::Running, std::sync::atomic::Ordering::SeqCst);
let (lock, pausevar) = &*environment.pausevar;
let _guard = lock.lock().unwrap();
pausevar.notify_all();
info!("Restarted environment labeled {}", environment_label.into());
Ok(())
}
State::Running => Err(ManagerError::EnvironmentAlreadyRunning {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
}
State::Running => Err(ManagerError::EnvironmentAlreadyRunning {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
},
}
None => Err(ManagerError::EnvironmentDoesNotExist {
label: environment_label.into(),
}),
Expand Down Expand Up @@ -259,25 +260,26 @@ impl Manager {
environment_label: S,
) -> Result<(), ManagerError> {
match self.environments.get_mut(&environment_label.clone().into()) {
Some(environment) => match environment.state.load(std::sync::atomic::Ordering::SeqCst)
{
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Paused, std::sync::atomic::Ordering::SeqCst);
info!("Paused environment labeled {}", environment_label.into());
Ok(())
Some(environment) => {
match environment.state.load(std::sync::atomic::Ordering::SeqCst) {
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Paused, std::sync::atomic::Ordering::SeqCst);
info!("Paused environment labeled {}", environment_label.into());
Ok(())
}
State::Paused => Err(ManagerError::EnvironmentAlreadyPaused {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
}
State::Paused => Err(ManagerError::EnvironmentAlreadyPaused {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
},
}
None => Err(ManagerError::EnvironmentDoesNotExist {
label: environment_label.into(),
}),
Expand Down Expand Up @@ -325,51 +327,52 @@ impl Manager {
environment_label: S,
) -> Result<(), ManagerError> {
match self.environments.get_mut(&environment_label.clone().into()) {
Some(environment) => match environment.state.load(std::sync::atomic::Ordering::SeqCst)
{
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::SeqCst);
match environment.handle.take() {
Some(handle) => {
if let Err(_) = handle.join() {
return Err(ManagerError::ThreadPanic);
Some(environment) => {
match environment.state.load(std::sync::atomic::Ordering::SeqCst) {
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::SeqCst);
match environment.handle.take() {
Some(handle) => {
if handle.join().is_err() {
return Err(ManagerError::ThreadPanic);
}
}
None => return Err(ManagerError::NoHandleAvailable),
}
None => return Err(ManagerError::NoHandleAvailable),
warn!(
"Stopped running environment labeled {}",
environment_label.into()
);
Ok(())
}
warn!(
"Stopped running environment labeled {}",
environment_label.into()
);
Ok(())
}
State::Paused => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::SeqCst);
match environment.handle.take() {
Some(handle) => {
if let Err(_) = handle.join() {
return Err(ManagerError::ThreadPanic);
State::Paused => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::SeqCst);
match environment.handle.take() {
Some(handle) => {
if handle.join().is_err() {
return Err(ManagerError::ThreadPanic);
}
}
None => return Err(ManagerError::NoHandleAvailable),
}
None => return Err(ManagerError::NoHandleAvailable),
warn!(
"Stopped paused environment labeled {}",
environment_label.into()
);
Ok(())
}
warn!(
"Stopped paused environment labeled {}",
environment_label.into()
);
Ok(())
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
}
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
},
}
None => Err(ManagerError::EnvironmentDoesNotExist {
label: environment_label.into(),
}),
Expand Down
59 changes: 44 additions & 15 deletions arbiter-core/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use revm::primitives::{CreateScheme, ExecutionResult, Output, TransactTo, TxEnv,
use serde::{de::DeserializeOwned, Serialize};
use thiserror::Error;

use crate::environment::{Environment, EventBroadcaster, ResultReceiver, ResultSender, TxSender, TransactionOutcome};
use crate::environment::{
Environment, EventBroadcaster, ResultReceiver, ResultSender, TransactionOutcome, TxSender,
};

/// A middleware structure that integrates with `revm`.
///
Expand Down Expand Up @@ -260,8 +262,16 @@ impl Middleware for RevmMiddleware {
tx: T,
_block: Option<BlockId>,
) -> Result<PendingTransaction<'_, Self::Provider>, Self::Error> {
if self.provider().as_ref().environment_state.load(std::sync::atomic::Ordering::SeqCst) == crate::environment::State::Paused {
return Err(RevmMiddlewareError::Send { cause: "Environment Paused".to_string() });
if self
.provider()
.as_ref()
.environment_state
.load(std::sync::atomic::Ordering::SeqCst)
== crate::environment::State::Paused
{
return Err(RevmMiddlewareError::Send {
cause: "Environment Paused".to_string(),
});
}

let tx: TypedTransaction = tx.into();
Expand Down Expand Up @@ -321,32 +331,38 @@ impl Middleware for RevmMiddleware {
logs,
output,
} = unpack_execution_result(execution_result)?;

match output {
Output::Create(_, address) => {
let address = address.ok_or(RevmMiddlewareError::MissingData {
cause: "Address missing in transaction!".to_string(),
})?;
let mut pending_tx =
PendingTransaction::new(ethers::types::H256::zero(), self.provider());
pending_tx.state = PendingTxState::RevmDeployOutput(recast_address(address));
pending_tx.state =
PendingTxState::RevmDeployOutput(recast_address(address));
return Ok(pending_tx);
}
Output::Call(_) => {
let mut pending_tx =
PendingTransaction::new(ethers::types::H256::zero(), self.provider());

pending_tx.state =
PendingTxState::RevmTransactOutput(logs, revm_result.block_number);
return Ok(pending_tx);
}
}
},
}
TransactionOutcome::Error(err) => {
return Err(RevmMiddlewareError::Receive { cause: format!("Error recieving response from the environement with environment error: {}", err).to_string() });

return Err(RevmMiddlewareError::Receive {
cause: format!(
"Error recieving response from the environement with environment error: {}",
err
)
.to_string(),
});
}
}

}

/// Calls a contract method without creating a worldstate-changing
Expand All @@ -362,8 +378,16 @@ impl Middleware for RevmMiddleware {
tx: &TypedTransaction,
_block: Option<BlockId>,
) -> Result<Bytes, Self::Error> {
if self.provider().as_ref().environment_state.load(std::sync::atomic::Ordering::SeqCst) == crate::environment::State::Paused {
return Err(RevmMiddlewareError::Send { cause: "Environment Paused".to_string() });
if self
.provider()
.as_ref()
.environment_state
.load(std::sync::atomic::Ordering::SeqCst)
== crate::environment::State::Paused
{
return Err(RevmMiddlewareError::Send {
cause: "Environment Paused".to_string(),
});
}
let tx = tx.clone();

Expand Down Expand Up @@ -423,9 +447,15 @@ impl Middleware for RevmMiddleware {
return Ok(Bytes::from(bytes.to_vec()));
}
}
},
}
TransactionOutcome::Error(err) => {
return Err(RevmMiddlewareError::Receive { cause: format!("Error recieving response from the environement with environment error: {}", err).to_string() });
return Err(RevmMiddlewareError::Receive {
cause: format!(
"Error recieving response from the environement with environment error: {}",
err
)
.to_string(),
});
}
}
}
Expand Down Expand Up @@ -519,7 +549,6 @@ pub struct Connection {
/// generated by `revm` and output by the [`Environment`].
filter_receivers: Arc<tokio::sync::Mutex<HashMap<ethers::types::U256, FilterReceiver>>>,


environment_state: Arc<crate::environment::AtomicState>,
}

Expand Down
Loading

0 comments on commit 5cd959f

Please sign in to comment.