Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Fly-Style committed Nov 29, 2024
1 parent 05bda85 commit 8d961f4
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 52 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ jobs:
- name: Go Integration Test
run: make tests-integration

- name: Rust Fastnear Integration Test
run: cargo test --features it-tests
working-directory: ./indexer

- name: Rust Integration Test
run: cargo test --features it-tests
working-directory: ./tests/e2e/e2e_tests
Expand Down
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ lapin = "2.3.1"
deadpool-lapin = "0.11.0"
tokio-executor-trait = "2.1.0"
tokio-reactor-trait = "1.1.0"
tokio-retry = "0.3"
prometheus = "0.13.3"

clap = { version = "4.5.21", features = ["color", "derive", "env"] }
Expand All @@ -41,3 +42,4 @@ near-crypto = { git = "https://github.com/near/nearcore", rev = "a83c18490cf4daf

[features]
use_fastnear = []
it_tests = []
2 changes: 2 additions & 0 deletions indexer/src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub(crate) struct RunConfigArgs {
/// Metrics socket addr
#[clap(long)]
pub metrics_ip_port_address: Option<SocketAddr>,
#[clap(long, default_value = "256")]
pub channel_width: usize
}

impl RunConfigArgs {
Expand Down
121 changes: 70 additions & 51 deletions indexer/src/fastnear_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,51 @@
use near_indexer::near_primitives::{types::AccountId, views::{ActionView, ExecutionStatusView, ReceiptEnumView}};
use near_indexer::near_primitives::{
types::AccountId,
views::{ActionView, ExecutionStatusView, ReceiptEnumView},
};
use reqwest::Client;
use std::collections::HashMap;
use tokio::sync::{mpsc, mpsc::{Receiver, Sender}};
use tokio::sync::{
mpsc,
mpsc::{Receiver, Sender},
};
use tracing::{debug, error, info, trace};

use crate::errors::Result;
use crate::metrics::{make_block_listener_metrics, BlockEventListener, Metricable};
use crate::{errors::Error, rmq_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext}, types::{BlockWithTxHashes, IndexerExecutionOutcomeWithReceiptAndTxHash, PartialCandidateData, PartialCandidateDataWithBlockTxHash}};
use crate::{
errors::Error,
rmq_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext},
types::{
BlockWithTxHashes, IndexerExecutionOutcomeWithReceiptAndTxHash, PartialCandidateData,
PartialCandidateDataWithBlockTxHash,
},
};

const FASTNEAR_ENDPOINT: &str = "https://testnet.neardata.xyz/v0/last_block/final";
const FASTNEAR_INDEXER: &str = "fastnear_indexer";

pub struct FastNearIndexer {
client: Client,
addresses_to_rollup_ids: HashMap<AccountId, u32>,
listener: Option<BlockEventListener>,
channel_width: usize,
}

impl FastNearIndexer {
pub(crate) fn new(addresses_to_rollup_ids: HashMap<AccountId, u32>) -> Self {
debug!(target: "fastnear_indexer", "Creating new FastNearIndexer");
Self {
pub(crate) fn new(addresses_to_rollup_ids: HashMap<AccountId, u32>, channel_width: usize) -> Self {
debug!(FASTNEAR_INDEXER, "Creating new FastNearIndexer");
Self {
client: Client::new(),
addresses_to_rollup_ids,
listener: None,
channel_width,
}
}

pub fn run(&self) -> Receiver<PublishData> {
info!(target: "fastnear_indexer", "Starting FastNearIndexer");
info!(FASTNEAR_INDEXER, "Starting FastNearIndexer");
let block_receiver = self.stream_latest_blocks();
let (publish_sender, publish_receiver) = mpsc::channel(100);
let (publish_sender, publish_receiver) = mpsc::channel(self.channel_width);

let addresses_to_rollup_ids = self.addresses_to_rollup_ids.clone();

Expand All @@ -45,11 +61,11 @@ impl FastNearIndexer {
publish_sender: Sender<PublishData>,
addresses_to_rollup_ids: HashMap<AccountId, u32>,
) {
debug!(target: "fastnear_indexer", "Starting block processing");
debug!(FASTNEAR_INDEXER, "Starting block processing");
while let Some(block) = block_receiver.recv().await {
trace!(target: "fastnear_indexer", "Received block: {:?}", block.block.header.height);
trace!(FASTNEAR_INDEXER, "Received block: {:?}", block.block.header.height);
if let Err(e) = Self::parse_and_publish_block(block, &publish_sender, &addresses_to_rollup_ids).await {
error!(target: "fastnear_indexer", "Error parsing and publishing block: {:?}", e);
error!(FASTNEAR_INDEXER, "Error parsing and publishing block: {:?}", e);
}
}
}
Expand All @@ -59,31 +75,31 @@ impl FastNearIndexer {
publish_sender: &Sender<PublishData>,
addresses_to_rollup_ids: &HashMap<AccountId, u32>,
) -> Result<(), Error> {
debug!(target: "fastnear_indexer", "Parsing block: {:?}", block.block.header.height);
debug!(FASTNEAR_INDEXER, "Parsing block: {:?}", block.block.header.height);
for shard in block.shards {
for receipt_execution_outcome in shard.receipt_execution_outcomes {
let receiver_id = &receipt_execution_outcome.receipt.receiver_id;
debug!(target: "fastnear_indexer", "Processing receipt for receiver_id: {}", receiver_id);
debug!(FASTNEAR_INDEXER, "Processing receipt for receiver_id: {}", receiver_id);
if let Some(rollup_id) = addresses_to_rollup_ids.get(receiver_id) {
trace!(target: "fastnear_indexer", "Processing receipt for rollup_id: {}", rollup_id);
trace!(FASTNEAR_INDEXER, "Processing receipt for rollup_id: {}", rollup_id);
if !Self::is_successful_execution(&receipt_execution_outcome) {
trace!(target: "fastnear_indexer", "Skipping unsuccessful execution for rollup_id: {}", rollup_id);
trace!(FASTNEAR_INDEXER, "Skipping unsuccessful execution for rollup_id: {}", rollup_id);
continue;
}

let partial_candidate_data = Self::receipt_filter_map(
receipt_execution_outcome.receipt.receipt,
*rollup_id
);
let partial_candidate_data =
Self::receipt_filter_map(receipt_execution_outcome.receipt.receipt, *rollup_id);

if let (Some(partial_data), Some(tx_hash)) = (partial_candidate_data, receipt_execution_outcome.tx_hash) {
if let (Some(partial_data), Some(tx_hash)) =
(partial_candidate_data, receipt_execution_outcome.tx_hash)
{
let candidate_data = PartialCandidateDataWithBlockTxHash {
rollup_id: *rollup_id,
payloads: partial_data.payloads,
tx_hash,
block_hash: block.block.header.hash,
};
debug!(target: "fastnear_indexer", "Sending candidate data for rollup_id: {}", rollup_id);
debug!(FASTNEAR_INDEXER, "Sending candidate data for rollup_id: {}", rollup_id);
Self::send(&candidate_data, publish_sender).await?;
}
}
Expand All @@ -94,21 +110,21 @@ impl FastNearIndexer {
}

pub fn stream_latest_blocks(&self) -> mpsc::Receiver<BlockWithTxHashes> {
info!(target: "fastnear_indexer", "Starting block stream");
let (block_sender, block_receiver) = mpsc::channel(100);
info!(FASTNEAR_INDEXER, "Starting block stream");
let (block_sender, block_receiver) = mpsc::channel(self.channel_width);
let client = self.client.clone();

tokio::spawn(async move {
loop {
match Self::fetch_latest_block(&client).await {
Ok(block) => {
if block_sender.send(block.clone()).await.is_err() {
error!(target: "fastnear_indexer", "Failed to send block to channel");
error!(FASTNEAR_INDEXER, "Failed to send block to channel");
break;
}
info!(target: "fastnear_indexer", "Successfully fetched and sent latest block with id: {}", block.block.header.height);
info!(FASTNEAR_INDEXER, "Successfully fetched and sent latest block with id: {}", block.block.header.height);
}
Err(e) => error!(target: "fastnear_indexer", "Error fetching latest block: {:?}", e),
Err(e) => error!(FASTNEAR_INDEXER, "Error fetching latest block: {:?}", e),
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand All @@ -118,20 +134,25 @@ impl FastNearIndexer {
}

async fn fetch_latest_block(client: &Client) -> Result<BlockWithTxHashes, Error> {
debug!(target: "fastnear_indexer", "Fetching latest block");
let response = client.get(FASTNEAR_ENDPOINT)
debug!(FASTNEAR_INDEXER, "Fetching latest block");
let response = client
.get(FASTNEAR_ENDPOINT)
.send()
.await
.and_then(|r| r.error_for_status())
.map_err(|e| Error::NetworkError(e.to_string()))?;

response.json::<BlockWithTxHashes>()
response
.json::<BlockWithTxHashes>()
.await
.map_err(|e| Error::DeserializeJsonError(e.to_string()))
}

async fn send(candidate_data: &PartialCandidateDataWithBlockTxHash, sender: &Sender<PublishData>) -> Result<(), Error> {
trace!(target: "fastnear_indexer", "Sending candidate data: {:?}", candidate_data);
async fn send(
candidate_data: &PartialCandidateDataWithBlockTxHash,
sender: &Sender<PublishData>,
) -> Result<(), Error> {
trace!(FASTNEAR_INDEXER, "Sending candidate data: {:?}", candidate_data);
for data in candidate_data.clone().payloads {
let publish_data = PublishData {
publish_options: PublishOptions {
Expand All @@ -157,42 +178,38 @@ impl FastNearIndexer {
receipt_execution_outcome.execution_outcome.outcome.status,
ExecutionStatusView::SuccessValue(ref value) if value.is_empty()
);
trace!(target: "fastnear_indexer", "Execution successful: {}", is_successful);
trace!(FASTNEAR_INDEXER, "Execution successful: {}", is_successful);
is_successful
}

fn receipt_filter_map(receipt_enum_view: ReceiptEnumView, rollup_id: u32) -> Option<PartialCandidateData> {
trace!(target: "fastnear_indexer", "Filtering receipt for rollup_id: {}", rollup_id);
trace!(FASTNEAR_INDEXER, "Filtering receipt for rollup_id: {}", rollup_id);
let payloads = match receipt_enum_view {
ReceiptEnumView::Action { actions, .. } => {
actions.into_iter()
.filter_map(Self::extract_args)
.collect::<Vec<Vec<u8>>>()
}
ReceiptEnumView::Action { actions, .. } => actions
.into_iter()
.filter_map(Self::extract_args)
.collect::<Vec<Vec<u8>>>(),
_ => return None,
};

if payloads.is_empty() {
trace!(target: "fastnear_indexer", "No payloads found for rollup_id: {}", rollup_id);
trace!(FASTNEAR_INDEXER, "No payloads found for rollup_id: {}", rollup_id);
return None;
}

Some(PartialCandidateData {
rollup_id,
payloads,
})
Some(PartialCandidateData { rollup_id, payloads })
}

fn extract_args(action: ActionView) -> Option<Vec<u8>> {
match action {
ActionView::FunctionCall { method_name, args, .. } if method_name == "submit" => {
trace!(target: "fastnear_indexer", "Extracted args for 'submit' method");
trace!(FASTNEAR_INDEXER, "Extracted args for 'submit' method");
Some(args.into())
},
}
_ => {
trace!(target: "fastnear_indexer", "Skipped non-'submit' method");
trace!(FASTNEAR_INDEXER, "Skipped non-'submit' method");
None
},
}
}
}
}
Expand All @@ -211,19 +228,21 @@ mod tests {
use super::*;
use near_crypto::{KeyType, PublicKey};
use near_indexer::near_primitives::views::{ActionView, ReceiptEnumView};
#[cfg(feature = "it_tests")]
use reqwest::Client;
#[cfg(feature = "it_tests")]
use std::collections::HashMap;

#[tokio::test]
#[cfg(all(test, feature = "it_tests"))]
async fn test_run() {
let addresses_to_rollup_ids = HashMap::new();
let indexer = FastNearIndexer::new(addresses_to_rollup_ids);
let indexer = FastNearIndexer::new(addresses_to_rollup_ids, 128);
let receiver = indexer.run();
// Since the run method spawns asynchronous tasks, we can check if the receiver is valid
assert!(receiver.capacity() > 0);
}

#[tokio::test]
#[cfg(all(test, feature = "it_tests"))]
async fn test_fetch_latest_block() {
let client = Client::new();
let result = FastNearIndexer::fetch_latest_block(&client).await;
Expand Down Expand Up @@ -298,4 +317,4 @@ mod tests {
let result = FastNearIndexer::receipt_filter_map(receipt_enum_view, rollup_id);
assert!(result.is_none());
}
}
}
2 changes: 1 addition & 1 deletion indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
}

if cfg!(feature = "use_fastnear") {
let fastnear_indexer = FastNearIndexer::new(addresses_to_rollup_ids);
let fastnear_indexer = FastNearIndexer::new(addresses_to_rollup_ids, config.channel_width);
let validated_stream = fastnear_indexer.run();

rmq_publisher.run(validated_stream);
Expand Down

0 comments on commit 8d961f4

Please sign in to comment.