Skip to content

Commit

Permalink
Support multiple sponsor addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Dec 16, 2024
1 parent cd0e521 commit 963754c
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 247 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ telemetry-subscribers = { git = "https://github.com/MystenLabs/sui", branch = "t

anyhow = "1.0.75"
async-trait = "0.1.51"
axum = {version = "0.6.6", features = ["headers"]}
axum = { version = "0.6.6", features = ["headers"] }
bcs = "0.1.6"
clap = "4.4.10"
chrono = "0.4.19"
Expand All @@ -53,6 +53,7 @@ tracing = "0.1.40"
tokio = { version = "1.36.0", features = ["full"] }
tokio-retry = "0.3.0"
serde_json = "1.0.108"
tokio-util = "0.7.10"

[dev-dependencies]
rand = "0.8.5"
Expand Down
6 changes: 3 additions & 3 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl Command {

let signer = signer_config.new_signer().await;
let storage_metrics = StorageMetrics::new(&prometheus_registry);
let sponsor_address = signer.get_address();
info!("Sponsor address: {:?}", sponsor_address);
let storage = connect_storage(&gas_pool_config, sponsor_address, storage_metrics).await;
let sponsor_addresses = signer.get_all_addresses();
info!("Sponsor addresses: {:?}", sponsor_addresses);
let storage = connect_storage(&gas_pool_config, sponsor_addresses, storage_metrics).await;
let sui_client = SuiClient::new(&fullnode_url, fullnode_basic_auth).await;
let _coin_init_task = if let Some(coin_init_config) = coin_init_config {
let task = GasPoolInitializer::start(
Expand Down
8 changes: 7 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Default for GasPoolStorageConfig {
pub enum TxSignerConfig {
Local { keypair: SuiKeyPair },
Sidecar { sidecar_url: String },
MultiSidecar { sidecar_urls: Vec<String> },
}

impl Default for TxSignerConfig {
Expand All @@ -97,7 +98,12 @@ impl TxSignerConfig {
pub async fn new_signer(self) -> Arc<dyn TxSigner> {
match self {
TxSignerConfig::Local { keypair } => TestTxSigner::new(keypair),
TxSignerConfig::Sidecar { sidecar_url } => SidecarTxSigner::new(sidecar_url).await,
TxSignerConfig::Sidecar { sidecar_url } => {
SidecarTxSigner::new(vec![sidecar_url]).await
}
TxSignerConfig::MultiSidecar { sidecar_urls } => {
SidecarTxSigner::new(sidecar_urls).await
}
}
}
}
Expand Down
48 changes: 28 additions & 20 deletions src/gas_pool/gas_pool_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use sui_types::transaction::{
};
use tap::TapFallible;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};

use super::gas_usage_cap::GasUsageCap;
Expand All @@ -28,9 +29,8 @@ const EXPIRATION_JOB_INTERVAL: Duration = Duration::from_secs(1);

pub struct GasPoolContainer {
inner: Arc<GasPool>,
_coin_unlocker_task: JoinHandle<()>,
// This is always Some. It is None only after the drop method is called.
cancel_sender: Option<tokio::sync::oneshot::Sender<()>>,
_coin_unlocker_tasks: Vec<JoinHandle<()>>,
cancel: CancellationToken,
}

pub struct GasPool {
Expand Down Expand Up @@ -66,10 +66,10 @@ impl GasPool {
) -> anyhow::Result<(SuiAddress, ReservationID, Vec<ObjectRef>)> {
let cur_time = std::time::Instant::now();
self.gas_usage_cap.check_usage().await?;
let sponsor = self.signer.get_address();
let sponsor = self.signer.get_one_address();
let (reservation_id, gas_coins) = self
.gas_pool_store
.reserve_gas_coins(gas_budget, duration.as_millis() as u64)
.reserve_gas_coins(sponsor, gas_budget, duration.as_millis() as u64)
.await?;
let elapsed = cur_time.elapsed().as_millis();
self.metrics.reserve_gas_latency_ms.observe(elapsed as u64);
Expand Down Expand Up @@ -106,7 +106,7 @@ impl GasPool {
"Payment coins in transaction: {:?}", payment
);
self.gas_pool_store
.ready_for_execution(reservation_id)
.ready_for_execution(sponsor, reservation_id)
.await?;
debug!(?reservation_id, "Reservation is ready for execution");

Expand Down Expand Up @@ -161,7 +161,7 @@ impl GasPool {
// Regardless of whether the transaction succeeded, we need to release the coins.
// Otherwise, we lose track of them. This is because `ready_for_execution` already takes
// the coins out of the pool and will not be covered by the auto-release mechanism.
self.release_gas_coins(updated_coins).await;
self.release_gas_coins(sponsor, updated_coins).await;
if smashed_coin_count > 0 {
info!(
?reservation_id,
Expand Down Expand Up @@ -260,11 +260,11 @@ impl GasPool {
}

/// Release gas coins back to the gas pool, by adding them to the storage.
async fn release_gas_coins(&self, gas_coins: Vec<GasCoin>) {
async fn release_gas_coins(&self, sponsor: SuiAddress, gas_coins: Vec<GasCoin>) {
debug!("Trying to release gas coins: {:?}", gas_coins);
retry_forever!(async {
self.gas_pool_store
.add_new_coins(gas_coins.clone())
.add_new_coins(sponsor, gas_coins.clone())
.await
.tap_err(|err| error!("Failed to call update_gas_coins on storage: {:?}", err))
})
Expand Down Expand Up @@ -293,11 +293,12 @@ impl GasPool {

async fn start_coin_unlock_task(
self: Arc<Self>,
mut cancel_receiver: tokio::sync::oneshot::Receiver<()>,
sponsor: SuiAddress,
cancel: CancellationToken,
) -> JoinHandle<()> {
tokio::task::spawn(async move {
loop {
let expire_results = self.gas_pool_store.expire_coins().await;
let expire_results = self.gas_pool_store.expire_coins(sponsor).await;
let unlocked_coins = expire_results.unwrap_or_else(|err| {
error!("Failed to call expire_coins to the storage: {:?}", err);
vec![]
Expand All @@ -312,12 +313,12 @@ impl GasPool {
.flatten()
.collect();
let count = latest_coins.len();
self.release_gas_coins(latest_coins).await;
self.release_gas_coins(sponsor, latest_coins).await;
info!("Released {:?} coins after expiration", count);
}
tokio::select! {
_ = tokio::time::sleep(EXPIRATION_JOB_INTERVAL) => {}
_ = &mut cancel_receiver => {
_ = cancel.cancelled() => {
info!("Coin unlocker task is cancelled");
break;
}
Expand All @@ -326,9 +327,9 @@ impl GasPool {
})
}

pub async fn query_pool_available_coin_count(&self) -> usize {
pub async fn query_pool_available_coin_count(&self, sponsor: SuiAddress) -> usize {
self.gas_pool_store
.get_available_coin_count()
.get_available_coin_count(sponsor)
.await
.unwrap()
}
Expand All @@ -342,6 +343,7 @@ impl GasPoolContainer {
gas_usage_daily_cap: u64,
metrics: Arc<GasPoolCoreMetrics>,
) -> Self {
let sponsor_addresses = signer.get_all_addresses();
let inner = GasPool::new(
signer,
gas_pool_store,
Expand All @@ -350,13 +352,19 @@ impl GasPoolContainer {
Arc::new(GasUsageCap::new(gas_usage_daily_cap)),
)
.await;
let (cancel_sender, cancel_receiver) = tokio::sync::oneshot::channel();
let _coin_unlocker_task = inner.clone().start_coin_unlock_task(cancel_receiver).await;
let cancel = CancellationToken::new();

let mut _coin_unlocker_tasks = vec![];
for sponsor in sponsor_addresses {
let inner = inner.clone();
let task = inner.start_coin_unlock_task(sponsor, cancel.clone()).await;
_coin_unlocker_tasks.push(task);
}

Self {
inner,
_coin_unlocker_task,
cancel_sender: Some(cancel_sender),
_coin_unlocker_tasks,
cancel,
}
}

Expand All @@ -367,6 +375,6 @@ impl GasPoolContainer {

impl Drop for GasPoolContainer {
fn drop(&mut self) {
self.cancel_sender.take().unwrap().send(()).unwrap();
self.cancel.cancel();
}
}
14 changes: 7 additions & 7 deletions src/gas_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ mod tests {
.await
.unwrap();
assert_eq!(gas_coins.len(), 3);
assert_eq!(station.query_pool_available_coin_count().await, 7);
assert_eq!(station.query_pool_available_coin_count(sponsor1).await, 7);
let (sponsor2, _res_id2, gas_coins) = station
.reserve_gas(MIST_PER_SUI * 7, Duration::from_secs(10))
.await
.unwrap();
assert_eq!(gas_coins.len(), 7);
assert_eq!(sponsor1, sponsor2);
assert_eq!(station.query_pool_available_coin_count().await, 0);
assert_eq!(station.query_pool_available_coin_count(sponsor2).await, 0);
assert!(station
.reserve_gas(1, Duration::from_secs(10))
.await
Expand All @@ -55,7 +55,7 @@ mod tests {
.await
.unwrap();
assert_eq!(gas_coins.len(), 1);
assert_eq!(station.query_pool_available_coin_count().await, 0);
assert_eq!(station.query_pool_available_coin_count(sponsor).await, 0);
assert!(station
.reserve_gas(1, Duration::from_secs(10))
.await
Expand All @@ -67,7 +67,7 @@ mod tests {
.await
.unwrap();
assert!(effects.status().is_ok());
assert_eq!(station.query_pool_available_coin_count().await, 1);
assert_eq!(station.query_pool_available_coin_count(sponsor).await, 1);
}

#[tokio::test]
Expand All @@ -93,7 +93,7 @@ mod tests {
.await;
println!("{:?}", result);
assert!(result.is_err());
assert_eq!(station.query_pool_available_coin_count().await, 1);
assert_eq!(station.query_pool_available_coin_count(sponsor).await, 1);
}

#[tokio::test]
Expand All @@ -106,14 +106,14 @@ mod tests {
.await
.unwrap();
assert_eq!(gas_coins.len(), 1);
assert_eq!(station.query_pool_available_coin_count().await, 0);
assert_eq!(station.query_pool_available_coin_count(sponsor).await, 0);
assert!(station
.reserve_gas(1, Duration::from_secs(1))
.await
.is_err());
// Sleep a little longer to give it enough time to expire.
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(station.query_pool_available_coin_count().await, 1);
assert_eq!(station.query_pool_available_coin_count(sponsor).await, 1);
let (tx_data, user_sig) = create_test_transaction(&test_cluster, sponsor, gas_coins).await;
assert!(station
.execute_transaction(reservation_id, tx_data, user_sig)
Expand Down
Loading

0 comments on commit 963754c

Please sign in to comment.