Skip to content

Commit

Permalink
Add fastnear address to config, fix CI
Browse files Browse the repository at this point in the history
  • Loading branch information
Fly-Style committed Nov 30, 2024
1 parent 39555dd commit b5ecb80
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: make tests-integration

- name: Rust Fastnear Integration Test
run: cargo test --features it-tests
run: cargo test --features it_tests
working-directory: ./indexer

- name: Rust Integration Test
Expand Down
4 changes: 4 additions & 0 deletions indexer/src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub(crate) struct RunConfigArgs {
/// Metrics socket addr
#[clap(long)]
pub metrics_ip_port_address: Option<SocketAddr>,
/// Address of fastnear block producer.
#[clap(long, default_value = "https://testnet.neardata.xyz/v0/last_block/final")]
pub fastnear_address: String,
/// Internal FastIndexer channels width.
#[clap(long, default_value = "256")]
pub channel_width: usize
}
Expand Down
35 changes: 25 additions & 10 deletions indexer/src/fastnear_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@ use crate::{
},
};

const FASTNEAR_ENDPOINT: &str = "https://testnet.neardata.xyz/v0/last_block/final";
const FASTNEAR_INDEXER: &str = "fastnear_indexer";

pub struct FastNearIndexer {
client: Client,
fastnear_address: String,
addresses_to_rollup_ids: HashMap<AccountId, u32>,
listener: Option<BlockEventListener>,
channel_width: usize,
}

impl FastNearIndexer {
pub(crate) fn new(addresses_to_rollup_ids: HashMap<AccountId, u32>, channel_width: usize) -> Self {
pub(crate) fn new(
fastnear_address: &str,
addresses_to_rollup_ids: HashMap<AccountId, u32>,
channel_width: usize,
) -> Self {
debug!(FASTNEAR_INDEXER, "Creating new FastNearIndexer");
Self {
fastnear_address: fastnear_address.to_string(),
client: Client::new(),
addresses_to_rollup_ids,
listener: None,
Expand Down Expand Up @@ -83,7 +88,11 @@ impl FastNearIndexer {
if let Some(rollup_id) = addresses_to_rollup_ids.get(receiver_id) {
trace!(FASTNEAR_INDEXER, "Processing receipt for rollup_id: {}", rollup_id);
if !Self::is_successful_execution(&receipt_execution_outcome) {
trace!(FASTNEAR_INDEXER, "Skipping unsuccessful execution for rollup_id: {}", rollup_id);
trace!(
FASTNEAR_INDEXER,
"Skipping unsuccessful execution for rollup_id: {}",
rollup_id
);
continue;
}

Expand Down Expand Up @@ -113,17 +122,21 @@ impl FastNearIndexer {
info!(FASTNEAR_INDEXER, "Starting block stream");
let (block_sender, block_receiver) = mpsc::channel(self.channel_width);
let client = self.client.clone();
let fastnear_address = self.fastnear_address.clone();

tokio::spawn(async move {
loop {
match Self::fetch_latest_block(&client).await {
match Self::fetch_latest_block(&client, fastnear_address.as_str()).await {
Ok(block) => {
let block_height = block.block.header.height;
if block_sender.send(block).await.is_err() {
error!(FASTNEAR_INDEXER, "Failed to send block to channel");
break;
}
info!(FASTNEAR_INDEXER, "Successfully fetched and sent latest block with id: {}", block_height);
info!(
FASTNEAR_INDEXER,
"Successfully fetched and sent latest block with id: {}", block_height
);
}
Err(e) => error!(FASTNEAR_INDEXER, "Error fetching latest block: {:?}", e),
}
Expand All @@ -134,10 +147,10 @@ impl FastNearIndexer {
block_receiver
}

async fn fetch_latest_block(client: &Client) -> Result<BlockWithTxHashes, Error> {
async fn fetch_latest_block(client: &Client, fastnear_address: &str) -> Result<BlockWithTxHashes, Error> {
debug!(FASTNEAR_INDEXER, "Fetching latest block");
let response = client
.get(FASTNEAR_ENDPOINT)
.get(fastnear_address)
.send()
.await
.and_then(|r| r.error_for_status())
Expand Down Expand Up @@ -165,7 +178,7 @@ impl FastNearIndexer {
},
payload: PublishPayload {
transaction_id: candidate_data.tx_hash,
data: data.clone()
data: data.clone(),
},
};
sender.send(publish_data).await?
Expand Down Expand Up @@ -234,11 +247,13 @@ mod tests {
#[cfg(feature = "it_tests")]
use std::collections::HashMap;

const FASTNEAR_DEFAULT_ENDPOINT: &str = "https://testnet.neardata.xyz/v0/last_block/final";

#[cfg(all(test, feature = "it_tests"))]
#[tokio::test]
async fn test_run() {
let addresses_to_rollup_ids = HashMap::new();
let indexer = FastNearIndexer::new(addresses_to_rollup_ids, 128);
let indexer = FastNearIndexer::new(FASTNEAR_DEFAULT_ENDPOINT, addresses_to_rollup_ids, 128);
let receiver = indexer.run();
// Since the run method spawns asynchronous tasks, we can check if the receiver is valid
assert!(receiver.capacity() > 0);
Expand All @@ -248,7 +263,7 @@ mod tests {
#[tokio::test]
async fn test_fetch_latest_block() {
let client = Client::new();
let result = FastNearIndexer::fetch_latest_block(&client).await;
let result = FastNearIndexer::fetch_latest_block(&client, FASTNEAR_DEFAULT_ENDPOINT).await;
assert!(result.is_ok(), "Failed to fetch latest block");
let block = result.unwrap();
// Basic assertions about the block
Expand Down
7 changes: 4 additions & 3 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ mod fastnear_indexer;
const INDEXER: &str = "indexer";

fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
let addresses_to_rollup_ids = config.compile_addresses_to_ids_map()?;
let addr_to_rollup_ids = config.compile_addresses_to_ids_map()?;
let fastnear_addr: &str = config.fastnear_address.as_str();
let system = actix::System::new();
let registry = Registry::new();
let server_handle = if let Some(metrics_addr) = config.metrics_ip_port_address {
Expand All @@ -41,7 +42,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
}

if cfg!(feature = "use_fastnear") {
let fastnear_indexer = FastNearIndexer::new(addresses_to_rollup_ids, config.channel_width);
let fastnear_indexer = FastNearIndexer::new(fastnear_addr, addr_to_rollup_ids, config.channel_width);
let validated_stream = fastnear_indexer.run();

rmq_publisher.run(validated_stream);
Expand All @@ -55,7 +56,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
validate_genesis: true,
};

let mut indexer = IndexerWrapper::new(indexer_config, addresses_to_rollup_ids);
let mut indexer = IndexerWrapper::new(indexer_config, addr_to_rollup_ids);
if config.metrics_ip_port_address.is_some() {
indexer.enable_metrics(registry.clone())?;
}
Expand Down

0 comments on commit b5ecb80

Please sign in to comment.