Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe-tx-pool): Introduce basic tx-pool #4366

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ ethexe-utils = { path = "ethexe/utils", default-features = false }
ethexe-validator = { path = "ethexe/validator", default-features = false }
ethexe-rpc = { path = "ethexe/rpc", default-features = false }
ethexe-common = { path = "ethexe/common" }
ethexe-tx-pool = { path = "ethexe/tx-pool", default-features = false }

# Common executor between `sandbox-host` and `lazy-pages-fuzzer`
wasmi = { package = "wasmi", version = "0.38"}
Expand Down
3 changes: 3 additions & 0 deletions ethexe/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ethexe-runtime-common.workspace = true
ethexe-prometheus-endpoint.workspace = true
ethexe-rpc.workspace = true
ethexe-utils.workspace = true
ethexe-tx-pool.workspace = true
gprimitives.workspace = true

clap = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -70,6 +71,8 @@ alloy = { workspace = true, features = [
ntest = "0.9.3"
gear-core.workspace = true
gear-utils.workspace = true
reqwest.workspace = true
serde_json.workspace = true

demo-ping = { workspace = true, features = ["debug", "ethexe"] }
demo-async = { workspace = true, features = ["debug", "ethexe"] }
79 changes: 72 additions & 7 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ use ethexe_observer::{RequestBlockData, RequestEvent};
use ethexe_processor::{LocalOutcome, ProcessorConfig};
use ethexe_sequencer::agro::AggregatedCommitments;
use ethexe_signer::{Digest, PublicKey, Signature, Signer};
use ethexe_tx_pool::{
EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender,
StandardTxPoolInstantiationArtifacts,
};
use ethexe_validator::BlockCommitmentValidationRequest;
use futures::{future, stream::StreamExt, FutureExt};
use gprimitives::H256;
Expand All @@ -55,6 +59,7 @@ pub struct Service {
processor: ethexe_processor::Processor,
signer: ethexe_signer::Signer,
block_time: Duration,
tx_pool_artifacts: StandardTxPoolInstantiationArtifacts,

// Optional services
network: Option<ethexe_network::NetworkService>,
Expand All @@ -79,6 +84,9 @@ pub enum NetworkMessage {
codes: Option<(Digest, Signature)>,
blocks: Option<(Digest, Signature)>,
},
Transaction {
transaction: EthexeTransaction,
},
}

impl Service {
Expand Down Expand Up @@ -200,10 +208,16 @@ impl Service {
})
.transpose()?;

let rpc = config
.rpc_config
.as_ref()
.map(|config| ethexe_rpc::RpcService::new(config.clone(), db.clone()));
log::info!("🚅 Tx pool service starting...");
let tx_pool_artifacts = ethexe_tx_pool::new((db.clone(),));

let rpc = config.rpc_config.as_ref().map(|config| {
ethexe_rpc::RpcService::new(
config.clone(),
db.clone(),
tx_pool_artifacts.input_sender.clone(),
)
});

Ok(Self {
db,
Expand All @@ -218,6 +232,7 @@ impl Service {
metrics_service,
rpc,
block_time: config.block_time,
tx_pool_artifacts,
})
}

Expand All @@ -244,6 +259,7 @@ impl Service {
validator: Option<ethexe_validator::Validator>,
metrics_service: Option<MetricsService>,
rpc: Option<ethexe_rpc::RpcService>,
tx_pool_artifacts: StandardTxPoolInstantiationArtifacts,
) -> Self {
Self {
db,
Expand All @@ -258,6 +274,7 @@ impl Service {
validator,
metrics_service,
rpc,
tx_pool_artifacts,
}
}

Expand Down Expand Up @@ -452,6 +469,7 @@ impl Service {
mut validator,
metrics_service,
rpc,
tx_pool_artifacts,
block_time,
} = self;

Expand Down Expand Up @@ -486,6 +504,13 @@ impl Service {
None
};

let StandardTxPoolInstantiationArtifacts {
service: tx_pool_service,
input_sender: tx_pool_input_task_sender,
output_receiver: mut tx_pool_ouput_task_receiver,
} = tx_pool_artifacts;
let mut tx_pool_handle = tokio::spawn(tx_pool_service.run());

let mut roles = "Observer".to_string();
if let Some(seq) = sequencer.as_ref() {
roles.push_str(&format!(", Sequencer ({})", seq.address()));
Expand Down Expand Up @@ -560,6 +585,7 @@ impl Service {
validator.as_mut(),
sequencer.as_mut(),
network_sender.as_mut(),
&tx_pool_input_task_sender,
);

if let Err(err) = result {
Expand All @@ -584,6 +610,10 @@ impl Service {
_ => {}
}
}
Some(task) = tx_pool_ouput_task_receiver.recv() => {
log::debug!("Received a task from the tx pool - {task:?}");
Self::process_tx_pool_output_task(task, network_sender.as_mut());
}
_ = maybe_await(network_handle.as_mut()) => {
log::info!("`NetworkWorker` has terminated, shutting down...");
break;
Expand All @@ -592,6 +622,10 @@ impl Service {
log::info!("`RPCWorker` has terminated, shutting down...");
break;
}
_ = &mut tx_pool_handle => {
log::info!("`TxPoolService` has terminated, shutting down...");
break;
}
}
}

Expand Down Expand Up @@ -630,7 +664,7 @@ impl Service {

if let Some(network_sender) = maybe_network_sender {
log::debug!("Publishing commitments to network...");
network_sender.publish_message(
network_sender.publish_commitment(
NetworkMessage::PublishCommitments {
codes: aggregated_codes.clone(),
blocks: aggregated_blocks.clone(),
Expand Down Expand Up @@ -719,7 +753,7 @@ impl Service {
codes: code_requests.clone(),
blocks: block_requests.clone(),
};
network_sender.publish_message(message.encode());
network_sender.publish_commitment(message.encode());
}

if let Some(validator) = maybe_validator {
Expand Down Expand Up @@ -775,6 +809,7 @@ impl Service {
maybe_validator: Option<&mut ethexe_validator::Validator>,
maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>,
maybe_network_sender: Option<&mut ethexe_network::NetworkSender>,
tx_pool_input_task_sender: &StandardInputTaskSender,
) -> Result<()> {
let message = NetworkMessage::decode(&mut data)?;
match message {
Expand Down Expand Up @@ -811,7 +846,7 @@ impl Service {
.transpose()?;

let message = NetworkMessage::ApproveCommitments { codes, blocks };
network_sender.publish_message(message.encode());
network_sender.publish_commitment(message.encode());

Ok(())
}
Expand All @@ -828,6 +863,21 @@ impl Service {
sequencer.receive_blocks_signature(digest, signature)?;
}

Ok(())
}
NetworkMessage::Transaction { transaction } => {
let _ = tx_pool_input_task_sender
.send(InputTask::AddTransaction {
transaction,
response_sender: None,
})
.inspect_err(|e| {
log::error!(
"Failed to send tx pool input task: {e}. \
The receiving end in the tx pool might have been dropped."
);
});

Ok(())
}
}
Expand Down Expand Up @@ -856,6 +906,21 @@ impl Service {

Ok(true)
}

fn process_tx_pool_output_task(
task: OutputTask<EthexeTransaction>,
mut maybe_network_sender: Option<&mut ethexe_network::NetworkSender>,
) {
match task {
OutputTask::PropogateTransaction { transaction } => {
if let Some(network_sender) = maybe_network_sender.as_mut() {
log::debug!("Publishing transaction to network...");
network_sender
.publish_transaction(NetworkMessage::Transaction { transaction }.encode());
}
}
}
}
}

mod utils {
Expand Down
Loading
Loading