Skip to content

Commit

Permalink
feat: add memory cache and refacto some stuff (#45)
Browse files Browse the repository at this point in the history
* feat: log -> tracing

* fix: improve error log

* cleanup

* feat: query defillama api util

* remove print

* feat: add memory cache for defillama
  • Loading branch information
EvolveArt authored Nov 23, 2024
1 parent 926e207 commit 05800ad
Show file tree
Hide file tree
Showing 15 changed files with 500 additions and 232 deletions.
303 changes: 269 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,19 @@ env_logger = "0.10.1"
futures = "0.3.28"
hyper = "0.14.27"
lazy_static = "1.4.0"
log = "0.4.20"
moka = { version = "0.12.8", features = ["future"] }
num-bigint = "0.4"
phf = { version = "0.11", features = ["macros"] }
prometheus = "0.13.3"
reqwest = { version = "0.11.22", features = ["json"] }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = { version = "1.0.130" }
starknet = "0.11.0"
strum = { version = "0.25.0", features = ["derive"] }
thiserror = "2.0"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"
url = "2.5.0"
uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] }

Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ pub async fn periodic_config_update() {
let mut next_update = Instant::now() + interval;

loop {
log::info!("[CONFIG] Updating config...");
tracing::info!("[CONFIG] Updating config...");

let new_config = Config::create_from_env().await;
let updated_config = ArcSwap::from_pointee(new_config.clone());
Expand Down
50 changes: 21 additions & 29 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,31 @@
use std::{error::Error as StdError, fmt};

use diesel::result::Error as DieselError;
use diesel_async::pooled_connection::deadpool::PoolError;
use starknet::providers::ProviderError;
use thiserror::Error;

#[derive(Debug)]
#[derive(Error, Debug)]
pub enum MonitoringError {
#[error("Price error: {0}")]
Price(String),
Database(diesel::result::Error),
Connection(diesel_async::pooled_connection::deadpool::PoolError),

#[error("Database error: {0}")]
Database(#[from] DieselError),

#[error("Connection error: {0}")]
Connection(#[from] PoolError),

#[error("API error: {0}")]
Api(String),

#[error("Conversion error: {0}")]
Conversion(String),

#[error("OnChain error: {0}")]
OnChain(String),
Provider(ProviderError),
InvalidTimestamp(u64),
}

impl StdError for MonitoringError {}

impl fmt::Display for MonitoringError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
MonitoringError::Price(e) => write!(f, "Price Error: {}", e),
MonitoringError::Database(e) => write!(f, "Database Error: {}", e),
MonitoringError::Connection(e) => write!(f, "Connection Error: {}", e),
MonitoringError::Api(e) => write!(f, "API Error: {}", e),
MonitoringError::Conversion(e) => write!(f, "Conversion Error: {}", e),
MonitoringError::OnChain(e) => write!(f, "OnChain Error: {}", e),
MonitoringError::Provider(e) => write!(f, "Provider Error: {}", e),
MonitoringError::InvalidTimestamp(e) => write!(f, "Invalid Timestamp: {}", e),
}
}
}
#[error("Provider error: {0}")]
Provider(#[from] ProviderError),

// Convert diesel error to our custom error
impl From<diesel::result::Error> for MonitoringError {
fn from(err: diesel::result::Error) -> MonitoringError {
MonitoringError::Database(err)
}
#[error("Invalid timestamp: {0}")]
InvalidTimestamp(u64),
}
103 changes: 54 additions & 49 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,40 @@ use deadpool::managed::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
use dotenv::dotenv;
use moka::future::Cache;
use monitoring::price_deviation::CoinPricesDTO;
use tokio::task::JoinHandle;
use tokio::time::interval;

use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType};
use processing::common::{check_publisher_balance, data_indexers_are_synced, indexers_are_synced};
use processing::common::{check_publisher_balance, data_indexers_are_synced};
use tracing::instrument;
use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results};

#[derive(Debug)]
struct MonitoringTask {
name: String,
handle: JoinHandle<()>,
}

#[tokio::main]
async fn main() {
env_logger::init();
// Start configuring a `fmt` subscriber
let subscriber = tracing_subscriber::fmt()
.compact()
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_target(false)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();

// Load environment variables from .env file
dotenv().ok();

// Define the pairs to monitor
let monitoring_config = get_config(None).await;
log::info!("Successfully fetched config: {:?}", monitoring_config);
tracing::info!("Successfully fetched config: {:?}", monitoring_config);
tokio::spawn(server::run_metrics_server());

let database_url: String = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
Expand All @@ -65,52 +77,57 @@ async fn main() {
init_long_tail_asset_configuration();

// Monitor spot/future in parallel
let monitoring_tasks = spawn_monitoring_tasks(pool.clone(), &monitoring_config).await;
let monitoring_tasks = spawn_monitoring_tasks(pool.clone()).await;
handle_task_results(monitoring_tasks).await;
}

#[instrument(skip_all)]
async fn spawn_monitoring_tasks(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
monitoring_config: &config::Config,
) -> Vec<MonitoringTask> {
let mut tasks = vec![
let cache = Cache::new(10_000);

let tasks = vec![
MonitoringTask {
name: "Config Update".to_string(),
handle: tokio::spawn(periodic_config_update()),
},
MonitoringTask {
name: "Spot Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)),
handle: tokio::spawn(onchain_monitor(
pool.clone(),
true,
&DataType::Spot,
cache.clone(),
)),
},
MonitoringTask {
name: "Future Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)),
handle: tokio::spawn(onchain_monitor(
pool.clone(),
true,
&DataType::Future,
cache.clone(),
)),
},
MonitoringTask {
name: "Publisher Monitoring".to_string(),
handle: tokio::spawn(publisher_monitor(pool.clone(), false)),
},
];

if monitoring_config.is_pragma_chain() {
tasks.push(MonitoringTask {
name: "Hyperlane Dispatches Monitoring".to_string(),
handle: tokio::spawn(hyperlane_dispatch_monitor(pool.clone(), true)),
});
} else {
tasks.push(MonitoringTask {
MonitoringTask {
name: "API Monitoring".to_string(),
handle: tokio::spawn(api_monitor()),
});
tasks.push(MonitoringTask {
handle: tokio::spawn(api_monitor(cache.clone())),
},
MonitoringTask {
name: "VRF Monitoring".to_string(),
handle: tokio::spawn(vrf_monitor(pool.clone())),
});
}
},
];

tasks
}

#[instrument]
async fn handle_task_results(tasks: Vec<MonitoringTask>) {
let mut results = HashMap::new();
for task in tasks {
Expand All @@ -120,9 +137,10 @@ async fn handle_task_results(tasks: Vec<MonitoringTask>) {
log_monitoring_results(results);
}

pub(crate) async fn api_monitor() {
#[instrument(skip(cache))]
pub(crate) async fn api_monitor(cache: Cache<(String, u64), CoinPricesDTO>) {
let monitoring_config = get_config(None).await;
log::info!("[API] Monitoring API..");
tracing::info!("[API] Monitoring API..");

let mut interval = interval(Duration::from_secs(30));

Expand All @@ -133,13 +151,14 @@ pub(crate) async fn api_monitor() {
.sources(DataType::Spot)
.iter()
.flat_map(|(pair, sources)| {
let my_cache = cache.clone();
if is_long_tail_asset(pair) {
vec![tokio::spawn(Box::pin(
processing::api::process_long_tail_assets(pair.clone(), sources.clone()),
))]
} else {
vec![tokio::spawn(Box::pin(
processing::api::process_data_by_pair(pair.clone()),
processing::api::process_data_by_pair(pair.clone(), my_cache),
))]
}
})
Expand All @@ -154,10 +173,12 @@ pub(crate) async fn api_monitor() {
}
}

#[instrument(skip(pool, cache))]
pub(crate) async fn onchain_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
data_type: &DataType,
cache: Cache<(String, u64), CoinPricesDTO>,
) {
let monitoring_config = get_config(None).await;

Expand Down Expand Up @@ -189,12 +210,14 @@ pub(crate) async fn onchain_monitor(
tokio::spawn(Box::pin(processing::spot::process_data_by_pair(
pool.clone(),
pair.clone(),
cache.clone(),
))),
tokio::spawn(Box::pin(
processing::spot::process_data_by_pair_and_sources(
pool.clone(),
pair.clone(),
sources.to_vec(),
cache.clone(),
),
)),
]
Expand All @@ -206,12 +229,14 @@ pub(crate) async fn onchain_monitor(
tokio::spawn(Box::pin(processing::future::process_data_by_pair(
pool.clone(),
pair.clone(),
cache.clone(),
))),
tokio::spawn(Box::pin(
processing::future::process_data_by_pair_and_sources(
pool.clone(),
pair.clone(),
sources.to_vec(),
cache.clone(),
),
)),
]
Expand All @@ -224,11 +249,12 @@ pub(crate) async fn onchain_monitor(
}
}

#[instrument(skip(pool))]
pub(crate) async fn publisher_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
) {
log::info!("[PUBLISHERS] Monitoring Publishers..");
tracing::info!("[PUBLISHERS] Monitoring Publishers..");

let mut interval = interval(Duration::from_secs(30));
let monitoring_config: arc_swap::Guard<std::sync::Arc<config::Config>> = get_config(None).await;
Expand Down Expand Up @@ -267,8 +293,9 @@ pub(crate) async fn publisher_monitor(
}
}

#[instrument(skip(pool))]
pub(crate) async fn vrf_monitor(pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>) {
log::info!("[VRF] Monitoring VRF requests..");
tracing::info!("[VRF] Monitoring VRF requests..");

let monitoring_config = get_config(None).await;
let mut interval = interval(Duration::from_secs(30));
Expand All @@ -294,25 +321,3 @@ pub(crate) async fn vrf_monitor(pool: Pool<AsyncDieselConnectionManager<AsyncPgC
log_tasks_results("VRF", results);
}
}

pub(crate) async fn hyperlane_dispatch_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
) {
let mut interval = interval(Duration::from_secs(5));

loop {
interval.tick().await; // Wait for the next tick

// Skip if indexer is still syncing
if wait_for_syncing && !indexers_are_synced("pragma_devnet_dispatch_event").await {
continue;
}

let tasks: Vec<_> = vec![tokio::spawn(Box::pin(
processing::dispatch::process_dispatch_events(pool.clone()),
))];
let results: Vec<_> = futures::future::join_all(tasks).await;
log_tasks_results("Dispatch", results);
}
}
Loading

0 comments on commit 05800ad

Please sign in to comment.