From b05885ab63e30ba237a1e4de78a240aa92bc8f9f Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 11 Jan 2024 12:14:42 +0700 Subject: [PATCH] Improve data loader functionality --- src/callback.rs | 35 +++++++++++++++++++++++++++-------- src/chart/mod.rs | 6 +++--- src/chart/models.rs | 14 +++++--------- src/chart/session.rs | 6 +++++- src/data_loader.rs | 32 +++++++++++++++----------------- src/quote/session.rs | 8 ++++++-- src/socket.rs | 4 +--- 7 files changed, 62 insertions(+), 43 deletions(-) diff --git a/src/callback.rs b/src/callback.rs index d285d83..9d304e6 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -1,20 +1,26 @@ use crate::{ - chart::models::{SeriesCompletedMessage, StudyResponseData}, - quote::models::QuoteData, + chart::{ + models::{DataPoint, SeriesCompletedMessage, StudyResponseData}, + ChartOptions, StudyOptions, + }, + quote::models::QuoteValue, + Error, }; use futures_util::{future::BoxFuture, Future}; use serde_json::Value; use std::sync::Arc; +use tracing::error; pub type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; #[derive(Clone)] pub struct Callbacks<'a> { - pub(crate) on_chart_data: Arc>>>, - pub(crate) on_quote_data: Arc>, + pub(crate) on_chart_data: Arc)>>, + pub(crate) on_quote_data: Arc>, pub(crate) on_series_completed: Arc>, - pub(crate) on_study_completed: Arc>, + pub(crate) on_study_completed: Arc>, pub(crate) on_unknown_event: Arc>, + pub(crate) on_error: Arc>, } impl Default for Callbacks<'_> { @@ -25,6 +31,11 @@ impl Default for Callbacks<'_> { on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), on_study_completed: Arc::new(Box::new(|_| Box::pin(async {}))), on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), + on_error: Arc::new(Box::new(|e| { + Box::pin( + async move { error!("default error callback logging && handling, error: {e}") }, + ) + })), } } } @@ -32,7 +43,7 @@ impl Default for Callbacks<'_> { impl<'a> Callbacks<'a> { pub fn on_chart_data( &mut self, - f: impl Fn(Vec>) -> Fut + Send + Sync + 'a, + f: impl Fn((ChartOptions, Vec)) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, @@ -43,7 +54,7 @@ impl<'a> Callbacks<'a> { pub fn on_quote_data( &mut self, - f: impl Fn(QuoteData) -> Fut + Send + Sync + 'a, + f: impl Fn(QuoteValue) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, @@ -65,7 +76,7 @@ impl<'a> Callbacks<'a> { pub fn on_study_completed( &mut self, - f: impl Fn(StudyResponseData) -> Fut + Send + Sync + 'a, + f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, @@ -84,4 +95,12 @@ impl<'a> Callbacks<'a> { self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } + + pub fn on_error(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } } diff --git a/src/chart/mod.rs b/src/chart/mod.rs index c4ab39b..f9fff7c 100644 --- a/src/chart/mod.rs +++ b/src/chart/mod.rs @@ -1,11 +1,11 @@ use iso_currency::Currency; -use crate::models::{ MarketAdjustment, SessionType, pine_indicator::ScriptType, Interval }; +use crate::models::{pine_indicator::ScriptType, Interval, MarketAdjustment, SessionType}; -pub mod session; -pub mod study; pub mod models; pub(crate) mod options; +pub mod session; +pub mod study; pub(crate) mod utils; #[derive(Default, Debug, Clone)] diff --git a/src/chart/models.rs b/src/chart/models.rs index f28fcbc..61f8b00 100644 --- a/src/chart/models.rs +++ b/src/chart/models.rs @@ -1,4 +1,3 @@ -use crate::models::Interval; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -50,10 +49,14 @@ pub struct GraphicDataResponse { pub indexes: Value, } -#[derive(Debug, Clone, Deserialize)] +#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[derive(Clone, Deserialize)] pub struct DataPoint { + #[cfg_attr(feature = "protobuf", prost(int64, tag = "1"))] #[serde(rename(deserialize = "i"))] pub index: i64, + #[cfg_attr(feature = "protobuf", prost(double, repeated, tag = "2"))] #[serde(rename(deserialize = "v"))] pub value: Vec, } @@ -67,13 +70,6 @@ pub struct ChartDataChanges { pub zoffset: i64, } -#[derive(Debug, Default, Clone, Serialize)] -pub struct ChartSeriesData { - pub symbol: String, - pub interval: Interval, - pub data: Vec>, -} - #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] #[cfg_attr(feature = "protobuf", derive(prost::Message))] #[derive(Clone, Serialize)] diff --git a/src/chart/session.rs b/src/chart/session.rs index ef85cca..1ed776c 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -5,7 +5,7 @@ use crate::{ payload, socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, utils::{gen_id, gen_session_id, symbol_init}, - Result, + Error, Result, }; use serde_json::Value; use std::fmt::Debug; @@ -453,4 +453,8 @@ impl<'a> Socket for WebSocket<'a> { self.data_loader.handle_events(event, &message.p).await; Ok(()) } + + async fn handle_error(&mut self, error: Error) { + (self.data_loader.callbacks.on_error)(error).await; + } } diff --git a/src/data_loader.rs b/src/data_loader.rs index b24f8bd..4925c63 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -4,22 +4,23 @@ use crate::{ models::{ChartResponseData, StudyResponseData, SymbolInfo}, session::SeriesInfo, }, + error::TradingViewError, quote::{ models::{QuoteData, QuoteValue}, utils::merge_quotes, }, socket::TradingViewDataEvent, - Result, + Error, Result, }; use serde::Deserialize; -use serde_json::{error, Value}; -use std::{collections::HashMap, f32::consts::E}; +use serde_json::Value; +use std::collections::HashMap; use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Default)] pub struct DataLoader<'a> { pub(crate) metadata: Metadata, - callbacks: Callbacks<'a>, + pub(crate) callbacks: Callbacks<'a>, } #[derive(Default, Debug, Clone)] @@ -49,7 +50,9 @@ impl<'a> DataLoader<'a> { .await { Ok(_) => (), - Err(e) => error!("{}", e), + Err(e) => { + (self.callbacks.on_error)(e).await; + } }; } TradingViewDataEvent::OnQuoteData => self.handle_quote_data(message).await, @@ -111,24 +114,16 @@ impl<'a> DataLoader<'a> { trace!("received v: {:?}, m: {:?}", s, message); match message[1].get(id.as_str()) { Some(resp_data) => { - let data: Vec> = ChartResponseData::deserialize(resp_data)? - .series - .into_iter() - .map(|point| point.value) - .collect(); - // timestamp, open, high, low, close, volume + let data = ChartResponseData::deserialize(resp_data)?.series; debug!("series data extracted: {:?}", data); - // TODO: Notify function - (self.callbacks.on_chart_data)(data).await; + (self.callbacks.on_chart_data)((s.options.clone(), data)).await; } None => { debug!("receive empty data on series: {:?}", s); } } } - self.handle_study_data(studies, message).await?; - Ok(()) } @@ -160,11 +155,14 @@ impl<'a> DataLoader<'a> { for q in self.metadata.quotes.values() { debug!("quote data: {:?}", q); - // TODO: Notify function for quote data + (self.callbacks.on_quote_data)(q.to_owned()).await; } } else { error!("quote data status error: {:?}", qsd); - // TODO: Notify function for quote data error + (self.callbacks.on_error)(Error::TradingViewError( + TradingViewError::QuoteDataStatusError, + )) + .await; } } } diff --git a/src/quote/session.rs b/src/quote/session.rs index 46258c8..df6c4da 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -4,13 +4,13 @@ use crate::{ quote::ALL_QUOTE_FIELDS, socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, utils::gen_session_id, - Result, + Error, Result, }; use serde_json::Value; #[derive(Clone)] pub struct WebSocket<'a> { - data_loader: DataLoader<'a>, + pub(crate) data_loader: DataLoader<'a>, socket: SocketSession, } @@ -87,4 +87,8 @@ impl<'a> Socket for WebSocket<'a> { self.data_loader.handle_events(event, &message.p).await; Ok(()) } + + async fn handle_error(&mut self, error: Error) { + (self.data_loader.callbacks.on_error)(error).await; + } } diff --git a/src/socket.rs b/src/socket.rs index 1918348..3567594 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -369,7 +369,5 @@ pub trait Socket { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()>; - async fn handle_error(&mut self, error: Error) { - error!("{}", error); - } + async fn handle_error(&mut self, error: Error); }