Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FeeRate estimation API and FeeRate poller API for Wallet #130

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl KaspaCli {
Events::Metrics { network_id : _, metrics : _ } => {
// log_info!("Kaspa NG - received metrics event {metrics:?}")
}
Events::FeeRate { .. } => {},
Events::Error { message } => { terrorln!(this,"{message}"); },
Events::UtxoProcStart => {},
Events::UtxoProcStop => {},
Expand Down
50 changes: 50 additions & 0 deletions wallet/core/src/api/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use crate::imports::*;
use crate::tx::{Fees, GeneratorSummary, PaymentDestination};
use kaspa_addresses::Address;
use kaspa_rpc_core::RpcFeerateBucket;

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -548,6 +549,55 @@ pub struct AccountsEstimateResponse {
pub generator_summary: GeneratorSummary,
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRateEstimateBucket {
feerate: f64,
seconds: f64,
}

impl From<RpcFeerateBucket> for FeeRateEstimateBucket {
fn from(bucket: RpcFeerateBucket) -> Self {
Self { feerate: bucket.feerate, seconds: bucket.estimated_seconds }
}
}

impl From<&RpcFeerateBucket> for FeeRateEstimateBucket {
fn from(bucket: &RpcFeerateBucket) -> Self {
Self { feerate: bucket.feerate, seconds: bucket.estimated_seconds }
}
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRateEstimateRequest {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRateEstimateResponse {
pub priority: FeeRateEstimateBucket,
pub normal: FeeRateEstimateBucket,
pub low: FeeRateEstimateBucket,
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerEnableRequest {
pub interval_seconds: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerEnableResponse {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerDisableRequest {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerDisableResponse {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionsDataGetRequest {
Expand Down
33 changes: 33 additions & 0 deletions wallet/core/src/api/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,39 @@ pub trait WalletApi: Send + Sync + AnySync {
/// an error.
async fn accounts_estimate_call(self: Arc<Self>, request: AccountsEstimateRequest) -> Result<AccountsEstimateResponse>;

/// Wrapper around [`accounts_estimate_call()`](Self::accounts_estimate_call)
async fn fee_rate_estimate(self: Arc<Self>) -> Result<FeeRateEstimateResponse> {
Ok(self.fee_rate_estimate_call(FeeRateEstimateRequest {}).await?)
}

/// Estimate current network fee rate. Returns a [`FeeRateEstimateResponse`]
async fn fee_rate_estimate_call(self: Arc<Self>, request: FeeRateEstimateRequest) -> Result<FeeRateEstimateResponse>;

/// Wrapper around [`fee_rate_poller_enable_call()`](Self::fee_rate_poller_enable_call).
async fn fee_rate_poller_enable(self: Arc<Self>, interval_seconds: u64) -> Result<()> {
self.fee_rate_poller_enable_call(FeeRatePollerEnableRequest { interval_seconds }).await?;
Ok(())
}

/// Enable the fee rate poller. The fee rate poller is a background task that
/// periodically polls the network for the current fee rate. The fee rate is
/// used to estimate the transaction fee. The poller is disabled by default.
/// This function stops the previously enabled poller and starts a new one
/// with the specified `interval`.
async fn fee_rate_poller_enable_call(self: Arc<Self>, request: FeeRatePollerEnableRequest) -> Result<FeeRatePollerEnableResponse>;

/// Wrapper around [`fee_rate_poller_disable_call()`](Self::fee_rate_poller_disable_call).
async fn fee_rate_poller_disable(self: Arc<Self>) -> Result<()> {
self.fee_rate_poller_disable_call(FeeRatePollerDisableRequest {}).await?;
Ok(())
}

/// Disable the fee rate poller.
async fn fee_rate_poller_disable_call(
self: Arc<Self>,
request: FeeRatePollerDisableRequest,
) -> Result<FeeRatePollerDisableResponse>;

/// Get a range of transaction records for a specific account id.
/// Wrapper around [`transactions_data_get_call()`](Self::transactions_data_get_call).
async fn transactions_data_get_range(
Expand Down
6 changes: 6 additions & 0 deletions wallet/core/src/api/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ impl WalletApi for WalletClient {
TransactionsReplaceNote,
TransactionsReplaceMetadata,
AddressBookEnumerate,
FeeRateEstimate,
FeeRatePollerEnable,
FeeRatePollerDisable,
]}
}

Expand Down Expand Up @@ -182,6 +185,9 @@ impl WalletServer {
TransactionsReplaceNote,
TransactionsReplaceMetadata,
AddressBookEnumerate,
FeeRateEstimate,
FeeRatePollerEnable,
FeeRatePollerDisable,
]}
}

Expand Down
10 changes: 10 additions & 0 deletions wallet/core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! produced by the client RPC and the Kaspa node monitoring subsystems.
//!

use crate::api::message::FeeRateEstimateBucket;
use crate::imports::*;
use crate::storage::{Hint, PrvKeyDataInfo, StorageDescriptor, TransactionRecord, WalletDescriptor};
use crate::utxo::context::UtxoContextId;
Expand Down Expand Up @@ -221,6 +222,11 @@ pub enum Events {
// metrics_data: MetricsData,
metrics: MetricsUpdate,
},
FeeRate {
priority: FeeRateEstimateBucket,
normal: FeeRateEstimateBucket,
low: FeeRateEstimateBucket,
},
/// A general wallet framework error, emitted when an unexpected
/// error occurs within the wallet framework.
Error {
Expand Down Expand Up @@ -284,6 +290,7 @@ pub enum EventKind {
Discovery,
Balance,
Metrics,
FeeRate,
Error,
}

Expand Down Expand Up @@ -320,6 +327,7 @@ impl From<&Events> for EventKind {
Events::Discovery { .. } => EventKind::Discovery,
Events::Balance { .. } => EventKind::Balance,
Events::Metrics { .. } => EventKind::Metrics,
Events::FeeRate { .. } => EventKind::FeeRate,
Events::Error { .. } => EventKind::Error,
}
}
Expand Down Expand Up @@ -359,6 +367,7 @@ impl FromStr for EventKind {
"discovery" => Ok(EventKind::Discovery),
"balance" => Ok(EventKind::Balance),
"metrics" => Ok(EventKind::Metrics),
"fee-rate" => Ok(EventKind::FeeRate),
"error" => Ok(EventKind::Error),
_ => Err(Error::custom("Invalid event kind")),
}
Expand Down Expand Up @@ -406,6 +415,7 @@ impl std::fmt::Display for EventKind {
EventKind::Discovery => "discovery",
EventKind::Balance => "balance",
EventKind::Metrics => "metrics",
EventKind::FeeRate => "fee-rate",
EventKind::Error => "error",
};

Expand Down
48 changes: 47 additions & 1 deletion wallet/core/src/utxo/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use kaspa_rpc_core::{
ops::{RPC_API_REVISION, RPC_API_VERSION},
},
message::UtxosChangedNotification,
GetServerInfoResponse,
GetServerInfoResponse, RpcFeeEstimate,
};
use kaspa_wrpc_client::KaspaRpcClient;
use workflow_core::channel::{Channel, DuplexChannel, Sender};
Expand Down Expand Up @@ -61,6 +61,8 @@ pub struct Inner {
metrics: Arc<Metrics>,
metrics_kinds: Mutex<Vec<MetricsUpdateKind>>,
connection_signaler: Mutex<Option<Sender<std::result::Result<(), String>>>>,
fee_rate_task_ctl: DuplexChannel,
fee_rate_task_is_running: AtomicBool,
}

impl Inner {
Expand Down Expand Up @@ -91,6 +93,8 @@ impl Inner {
metrics: Arc::new(Metrics::default()),
metrics_kinds: Mutex::new(vec![]),
connection_signaler: Mutex::new(None),
fee_rate_task_ctl: DuplexChannel::oneshot(),
fee_rate_task_is_running: AtomicBool::new(false),
}
}
}
Expand Down Expand Up @@ -728,6 +732,48 @@ impl UtxoProcessor {
pub fn enable_metrics_kinds(&self, metrics_kinds: &[MetricsUpdateKind]) {
*self.inner.metrics_kinds.lock().unwrap() = metrics_kinds.to_vec();
}

pub async fn start_fee_rate_poller(&self, poller_interval: Duration) -> Result<()> {
self.stop_fee_rate_poller().await.ok();

let this = self.clone();
this.inner.fee_rate_task_is_running.store(true, Ordering::SeqCst);
let fee_rate_task_ctl_receiver = self.inner.fee_rate_task_ctl.request.receiver.clone();
let fee_rate_task_ctl_sender = self.inner.fee_rate_task_ctl.response.sender.clone();

let mut interval = workflow_core::task::interval(poller_interval);

spawn(async move {
loop {
select_biased! {
_ = interval.next().fuse() => {
if let Ok(fee_rate) = this.rpc_api().get_fee_estimate().await {
let RpcFeeEstimate { priority_bucket, normal_buckets, low_buckets } = fee_rate;
this.notify(Events::FeeRate {
priority : priority_bucket.into(),
normal : normal_buckets.first().expect("missing normal feerate bucket").into(),
low : low_buckets.first().expect("missing normal feerate bucket").into()
}).await.ok();
}
},
_ = fee_rate_task_ctl_receiver.recv().fuse() => {
break;
},
}
}

fee_rate_task_ctl_sender.send(()).await.unwrap();
});

Ok(())
}

pub async fn stop_fee_rate_poller(&self) -> Result<()> {
if self.inner.fee_rate_task_is_running.load(Ordering::SeqCst) {
self.inner.fee_rate_task_ctl.signal(()).await.expect("UtxoProcessor::stop_task() `signal` error");
}
Ok(())
}
}

#[cfg(test)]
Expand Down
25 changes: 25 additions & 0 deletions wallet/core/src/wallet/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::result::Result;
use crate::storage::interface::TransactionRangeResult;
use crate::storage::Binding;
use crate::tx::Fees;
use kaspa_rpc_core::RpcFeeEstimate;
use workflow_core::channel::Receiver;

#[async_trait]
Expand Down Expand Up @@ -513,4 +514,28 @@ impl WalletApi for super::Wallet {
) -> Result<AddressBookEnumerateResponse> {
return Err(Error::NotImplemented);
}

async fn fee_rate_estimate_call(self: Arc<Self>, _request: FeeRateEstimateRequest) -> Result<FeeRateEstimateResponse> {
let RpcFeeEstimate { priority_bucket, normal_buckets, low_buckets } = self.rpc_api().get_fee_estimate().await?;

Ok(FeeRateEstimateResponse {
priority: priority_bucket.into(),
normal: normal_buckets.first().ok_or(Error::custom("missing normal feerate bucket"))?.into(),
low: low_buckets.first().ok_or(Error::custom("missing normal feerate bucket"))?.into(),
})
}

async fn fee_rate_poller_enable_call(self: Arc<Self>, request: FeeRatePollerEnableRequest) -> Result<FeeRatePollerEnableResponse> {
let FeeRatePollerEnableRequest { interval_seconds } = request;
self.utxo_processor().start_fee_rate_poller(Duration::from_secs(interval_seconds)).await?;
Ok(FeeRatePollerEnableResponse {})
}

async fn fee_rate_poller_disable_call(
self: Arc<Self>,
_request: FeeRatePollerDisableRequest,
) -> Result<FeeRatePollerDisableResponse> {
self.utxo_processor().stop_fee_rate_poller().await?;
Ok(FeeRatePollerDisableResponse {})
}
}
85 changes: 85 additions & 0 deletions wallet/core/src/wasm/api/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1555,6 +1555,91 @@ try_from! ( args: AccountsEstimateResponse, IAccountsEstimateResponse, {

// ---

declare! {
IFeeRateEstimateBucket,
r#"
export interface IFeeRateEstimateBucket {
feeRate : number;
seconds : number;
}
"#,
}

declare! {
IFeeRateEstimateRequest,
r#"
export interface IFeeRateEstimateRequest { }
"#,
}

try_from! ( _args: IFeeRateEstimateRequest, FeeRateEstimateRequest, {
Ok(FeeRateEstimateRequest { })
});

declare! {
IFeeRateEstimateResponse,
r#"
export interface IFeeRateEstimateResponse {
priority : IFeeRateEstimateBucket,
normal : IFeeRateEstimateBucket,
low : IFeeRateEstimateBucket,
}
"#,
}

try_from! ( args: FeeRateEstimateResponse, IFeeRateEstimateResponse, {
Ok(to_value(&args)?.into())
});

declare! {
IFeeRatePollerEnableRequest,
r#"
export interface IFeeRatePollerEnableRequest {
intervalSeconds : number;
}
"#,
}

try_from! ( args: IFeeRatePollerEnableRequest, FeeRatePollerEnableRequest, {
let interval_seconds = args.get_u64("intervalSeconds")?;
Ok(FeeRatePollerEnableRequest { interval_seconds })
});

declare! {
IFeeRatePollerEnableResponse,
r#"
export interface IFeeRatePollerEnableResponse { }
"#,
}

try_from! ( _args: FeeRatePollerEnableResponse, IFeeRatePollerEnableResponse, {
Ok(IFeeRatePollerEnableResponse::default())
});

declare! {
IFeeRatePollerDisableRequest,
r#"
export interface IFeeRatePollerDisableRequest { }
"#,
}

try_from! ( _args: IFeeRatePollerDisableRequest, FeeRatePollerDisableRequest, {
Ok(FeeRatePollerDisableRequest { })
});

declare! {
IFeeRatePollerDisableResponse,
r#"
export interface IFeeRatePollerDisableResponse { }
"#,
}

try_from! ( _args: FeeRatePollerDisableResponse, IFeeRatePollerDisableResponse, {
Ok(IFeeRatePollerDisableResponse::default())
});

// ---

declare! {
ITransactionsDataGetRequest,
r#"
Expand Down
3 changes: 3 additions & 0 deletions wallet/core/src/wasm/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ declare_wasm_handlers!([
TransactionsReplaceNote,
TransactionsReplaceMetadata,
AddressBookEnumerate,
FeeRateEstimate,
FeeRatePollerEnable,
FeeRatePollerDisable,
]);
Loading