Skip to content

Commit

Permalink
fix: add new validators on initial query and becomde validator tx (#130)
Browse files Browse the repository at this point in the history
* fix: query validator set in initial query

* fix: add new validators on become validator

* fix: box become validator data
  • Loading branch information
mateuszjasiuk authored Oct 9, 2024
1 parent 1ecfb66 commit 667abcf
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 44 deletions.
64 changes: 34 additions & 30 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::convert::identity;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use chain::app_state::AppState;
use chain::config::AppConfig;
use chain::repository;
use chain::services::db::get_pos_crawler_state;
use chain::services::namada::{
query_all_balances, query_all_bonds_and_unbonds, query_all_proposals,
query_bonds, query_last_block_height, query_tokens,
Expand All @@ -29,8 +27,8 @@ use shared::crawler_state::ChainCrawlerState;
use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError};
use shared::id::Id;
use shared::token::Token;
use shared::validator::ValidatorSet;
use tendermint_rpc::HttpClient;
use tokio::time::sleep;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;

Expand Down Expand Up @@ -75,7 +73,7 @@ async fn main() -> Result<(), MainError> {
.context_db_interact_error()
.into_db_error()?;

initial_query(&client, &conn, config.initial_query_retry_time).await?;
initial_query(&client, &conn).await?;

let crawler_state = db_service::get_chain_crawler_state(&conn)
.await
Expand Down Expand Up @@ -182,6 +180,12 @@ async fn crawling_fn(
let proposals_votes = block.governance_votes();
tracing::info!("Creating {} governance votes...", proposals_votes.len());

let validators = block.validators();
let validator_set = ValidatorSet {
validators: validators.clone(),
epoch,
};

let addresses = block.bond_addresses();
let bonds = query_bonds(&client, addresses).await.into_rpc_error()?;
tracing::info!("Updating bonds for {} addresses", bonds.len());
Expand Down Expand Up @@ -248,6 +252,11 @@ async fn crawling_fn(
proposals_votes,
)?;

repository::pos::upsert_validators(
transaction_conn,
validator_set,
)?;

// We first remove all the bonds and then insert the new ones
repository::pos::clear_bonds(
transaction_conn,
Expand Down Expand Up @@ -295,44 +304,34 @@ async fn crawling_fn(
async fn initial_query(
client: &HttpClient,
conn: &Object,
initial_query_retry_time: u64,
) -> Result<(), MainError> {
tracing::info!("Querying initial data...");
let block_height =
query_last_block_height(client).await.into_rpc_error()?;
let mut epoch =
namada_service::get_epoch_at_block_height(client, block_height)
.await
.into_rpc_error()?;
let epoch = namada_service::get_epoch_at_block_height(client, block_height)
.await
.into_rpc_error()?;
let first_block_in_epoch = namada_service::get_first_block_in_epoch(client)
.await
.into_rpc_error()?;

loop {
let pos_crawler_state =
get_pos_crawler_state(conn).await.into_db_error();

match pos_crawler_state {
// >= in case epochs are really short
Ok(pos_crawler_state)
if pos_crawler_state.last_processed_epoch >= epoch =>
{
// We assign pos crawler epoch as epoch to process
epoch = pos_crawler_state.last_processed_epoch;
break;
}
_ => {}
}

tracing::info!("Waiting for PoS service update...");

sleep(Duration::from_secs(initial_query_retry_time)).await;
}

let tokens = query_tokens(client).await.into_rpc_error()?;

let balances = query_all_balances(client).await.into_rpc_error()?;

tracing::info!("Querying validators set...");
let pipeline_length = namada_service::query_pipeline_length(client)
.await
.into_rpc_error()?;
// We need to add pipeline_length to the epoch as it is possible to bond in
// advance
let validator_set = namada_service::get_validator_set_at_epoch(
client,
epoch + pipeline_length as u32,
)
.await
.into_rpc_error()?;

tracing::info!("Querying bonds and unbonds...");
let (bonds, unbonds) = query_all_bonds_and_unbonds(client, None, None)
.await
Expand Down Expand Up @@ -384,6 +383,11 @@ async fn initial_query(
proposals_votes,
)?;

repository::pos::upsert_validators(
transaction_conn,
validator_set,
)?;

repository::pos::insert_bonds(transaction_conn, bonds)?;
repository::pos::insert_unbonds(transaction_conn, unbonds)?;

Expand Down
44 changes: 42 additions & 2 deletions chain/src/repository/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use diesel::{
use orm::bond::BondInsertDb;
use orm::schema::{bonds, pos_rewards, unbonds, validators};
use orm::unbond::UnbondInsertDb;
use orm::validators::{ValidatorDb, ValidatorUpdateMetadataDb};
use orm::validators::{
ValidatorDb, ValidatorInsertDb, ValidatorUpdateMetadataDb,
};
use shared::block::Epoch;
use shared::bond::Bonds;
use shared::id::Id;
use shared::unbond::{UnbondAddresses, Unbonds};
use shared::validator::ValidatorMetadataChange;
use shared::validator::{ValidatorMetadataChange, ValidatorSet};

pub fn clear_bonds(
transaction_conn: &mut PgConnection,
Expand Down Expand Up @@ -198,6 +200,44 @@ pub fn update_validator_metadata(
anyhow::Ok(())
}

pub fn upsert_validators(
transaction_conn: &mut PgConnection,
validators_set: ValidatorSet,
) -> anyhow::Result<()> {
let validators_db = &validators_set
.validators
.into_iter()
.map(ValidatorInsertDb::from_validator)
.collect::<Vec<_>>();

diesel::insert_into(validators::table)
.values::<&Vec<ValidatorInsertDb>>(validators_db)
.on_conflict(validators::columns::namada_address)
.do_update()
.set((
validators::columns::voting_power
.eq(excluded(validators::columns::voting_power)),
validators::columns::max_commission
.eq(excluded(validators::columns::max_commission)),
validators::columns::commission
.eq(excluded(validators::columns::commission)),
validators::columns::email.eq(excluded(validators::columns::email)),
validators::columns::website
.eq(excluded(validators::columns::website)),
validators::columns::description
.eq(excluded(validators::columns::description)),
validators::columns::discord_handle
.eq(excluded(validators::columns::discord_handle)),
validators::columns::avatar
.eq(excluded(validators::columns::avatar)),
validators::columns::state.eq(excluded(validators::columns::state)),
))
.execute(transaction_conn)
.context("Failed to update validators in db")?;

Ok(())
}

#[cfg(test)]
mod tests {
use orm::bond::BondDb;
Expand Down
93 changes: 92 additions & 1 deletion chain/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashSet;
use std::str::FromStr;

use anyhow::{anyhow, Context};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use namada_core::chain::{
BlockHeight as NamadaSdkBlockHeight, Epoch as NamadaSdkEpoch,
};
Expand All @@ -26,6 +26,7 @@ use shared::proposal::{GovernanceProposal, TallyType};
use shared::token::{IbcToken, Token};
use shared::unbond::{Unbond, UnbondAddresses, Unbonds};
use shared::utils::BalanceChange;
use shared::validator::{Validator, ValidatorSet, ValidatorState};
use shared::vote::{GovernanceVote, ProposalVoteKind};
use subtle_encoding::hex;
use tendermint_rpc::HttpClient;
Expand Down Expand Up @@ -596,6 +597,96 @@ pub async fn query_all_votes(
anyhow::Ok(votes.iter().flatten().cloned().collect())
}

pub async fn get_validator_set_at_epoch(
client: &HttpClient,
epoch: Epoch,
) -> anyhow::Result<ValidatorSet> {
let namada_epoch = NamadaSdkEpoch::from(epoch as u64);
let validator_set = rpc::get_all_validators(client, namada_epoch)
.await
.with_context(|| {
format!(
"Failed to query Namada's consensus validators at epoch \
{epoch}"
)
})?;

let validators = futures::stream::iter(validator_set)
.map(|address| async move {
let voting_power_fut = async {
rpc::get_validator_stake(client, namada_epoch, &address)
.await
.with_context(|| {
format!(
"Failed to query the stake of validator {address} \
at epoch {namada_epoch}"
)
})
};

let commission_fut = async {
rpc::query_commission_rate(client, &address, Some(namada_epoch))
.await
.with_context(|| {
format!(
"Failed to query commission of validator \
{address} at epoch {namada_epoch}"
)
})
};

let validator_state_fut = async {
rpc::get_validator_state(client, &address, Some(namada_epoch))
.await
.with_context(|| {
format!(
"Failed to query validator {address} \
state"
)
})
};

let (voting_power, commission_pair, validator_state) =
futures::try_join!(voting_power_fut, commission_fut, validator_state_fut)?;
let commission = commission_pair
.commission_rate
.expect("Commission rate has to exist")
.to_string();
let max_commission = commission_pair
.max_commission_change_per_epoch
.expect("Max commission rate change has to exist")
.to_string();
let validator_state = validator_state.0.map(ValidatorState::from).unwrap_or(ValidatorState::Unknown);

anyhow::Ok(Validator {
address: Id::Account(address.to_string()),
voting_power: voting_power.to_string_native(),
max_commission,
commission,
name: None,
email: None,
description: None,
website: None,
discord_handler: None,
avatar: None,
state: validator_state
})
})
.buffer_unordered(100)
.try_collect::<HashSet<_>>()
.await?;

Ok(ValidatorSet { validators, epoch })
}

pub async fn query_pipeline_length(client: &HttpClient) -> anyhow::Result<u64> {
let pos_parameters = rpc::get_pos_params(client)
.await
.with_context(|| "Failed to query pos parameters".to_string())?;

Ok(pos_parameters.pipeline_len)
}

fn to_block_height(block_height: u32) -> NamadaSdkBlockHeight {
NamadaSdkBlockHeight::from(block_height as u64)
}
4 changes: 4 additions & 0 deletions orm/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum TransactionKindDb {
ChangeMetadata,
ChangeCommission,
RevealPk,
BecomeValidator,
Unknown,
}

Expand Down Expand Up @@ -54,6 +55,9 @@ impl From<TransactionKind> for TransactionKindDb {
TransactionKindDb::ChangeCommission
}
TransactionKind::RevealPk(_) => TransactionKindDb::RevealPk,
TransactionKind::BecomeValidator(_) => {
TransactionKindDb::BecomeValidator
}
TransactionKind::Unknown => TransactionKindDb::Unknown,
}
}
Expand Down
4 changes: 3 additions & 1 deletion pos/src/services/namada.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use anyhow::Context;
use futures::{StreamExt, TryStreamExt};
use namada_core::chain::Epoch as NamadaSdkEpoch;
Expand Down Expand Up @@ -83,7 +85,7 @@ pub async fn get_validator_set_at_epoch(
})
})
.buffer_unordered(100)
.try_collect::<Vec<_>>()
.try_collect::<HashSet<_>>()
.await?;

Ok(ValidatorSet { validators, epoch })
Expand Down
34 changes: 33 additions & 1 deletion shared/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::transaction::{
};
use crate::unbond::UnbondAddresses;
use crate::utils::BalanceChange;
use crate::validator::ValidatorMetadataChange;
use crate::validator::{Validator, ValidatorMetadataChange, ValidatorState};
use crate::vote::GovernanceVote;

pub type Epoch = u32;
Expand Down Expand Up @@ -490,6 +490,38 @@ impl Block {
Some(recv_msg)
}

pub fn validators(&self) -> HashSet<Validator> {
self.transactions
.iter()
.flat_map(|(_, txs)| txs)
.filter(|tx| {
tx.data.is_some()
&& tx.exit_code == TransactionExitStatus::Applied
})
.filter_map(|tx| match &tx.kind {
TransactionKind::BecomeValidator(data) => {
let data = data.clone().unwrap();
Some(Validator {
address: Id::from(data.address),
voting_power: "0".to_string(),
max_commission: data
.max_commission_rate_change
.to_string(),
commission: data.commission_rate.to_string(),
name: data.name,
email: Some(data.email),
description: data.description,
website: data.website,
discord_handler: data.discord_handle,
avatar: data.avatar,
state: ValidatorState::Inactive,
})
}
_ => None,
})
.collect()
}

pub fn bond_addresses(&self) -> HashSet<BondAddresses> {
self.transactions
.iter()
Expand Down
Loading

0 comments on commit 667abcf

Please sign in to comment.