Skip to content

Commit

Permalink
Improve data loader functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenthdat committed Jan 11, 2024
1 parent eb4acff commit b05885a
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 43 deletions.
35 changes: 27 additions & 8 deletions src/callback.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
use crate::{
chart::models::{SeriesCompletedMessage, StudyResponseData},
quote::models::QuoteData,
chart::{
models::{DataPoint, SeriesCompletedMessage, StudyResponseData},
ChartOptions, StudyOptions,
},
quote::models::QuoteValue,
Error,
};
use futures_util::{future::BoxFuture, Future};
use serde_json::Value;
use std::sync::Arc;
use tracing::error;

pub type AsyncCallback<'a, T> = Box<dyn (Fn(T) -> BoxFuture<'a, ()>) + Send + Sync + 'a>;

#[derive(Clone)]
pub struct Callbacks<'a> {
pub(crate) on_chart_data: Arc<AsyncCallback<'a, Vec<Vec<f64>>>>,
pub(crate) on_quote_data: Arc<AsyncCallback<'a, QuoteData>>,
pub(crate) on_chart_data: Arc<AsyncCallback<'a, (ChartOptions, Vec<DataPoint>)>>,
pub(crate) on_quote_data: Arc<AsyncCallback<'a, QuoteValue>>,
pub(crate) on_series_completed: Arc<AsyncCallback<'a, SeriesCompletedMessage>>,
pub(crate) on_study_completed: Arc<AsyncCallback<'a, StudyResponseData>>,
pub(crate) on_study_completed: Arc<AsyncCallback<'a, (StudyOptions, StudyResponseData)>>,
pub(crate) on_unknown_event: Arc<AsyncCallback<'a, Value>>,
pub(crate) on_error: Arc<AsyncCallback<'a, Error>>,
}

impl Default for Callbacks<'_> {
Expand All @@ -25,14 +31,19 @@ impl Default for Callbacks<'_> {
on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))),
on_study_completed: Arc::new(Box::new(|_| Box::pin(async {}))),
on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))),
on_error: Arc::new(Box::new(|e| {
Box::pin(
async move { error!("default error callback logging && handling, error: {e}") },
)
})),
}
}
}

impl<'a> Callbacks<'a> {
pub fn on_chart_data<Fut>(
&mut self,
f: impl Fn(Vec<Vec<f64>>) -> Fut + Send + Sync + 'a,
f: impl Fn((ChartOptions, Vec<DataPoint>)) -> Fut + Send + Sync + 'a,
) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
Expand All @@ -43,7 +54,7 @@ impl<'a> Callbacks<'a> {

pub fn on_quote_data<Fut>(
&mut self,
f: impl Fn(QuoteData) -> Fut + Send + Sync + 'a,
f: impl Fn(QuoteValue) -> Fut + Send + Sync + 'a,
) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
Expand All @@ -65,7 +76,7 @@ impl<'a> Callbacks<'a> {

pub fn on_study_completed<Fut>(
&mut self,
f: impl Fn(StudyResponseData) -> Fut + Send + Sync + 'a,
f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a,
) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
Expand All @@ -84,4 +95,12 @@ impl<'a> Callbacks<'a> {
self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data))));
self
}

pub fn on_error<Fut>(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
{
self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data))));
self
}
}
6 changes: 3 additions & 3 deletions src/chart/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use iso_currency::Currency;

use crate::models::{ MarketAdjustment, SessionType, pine_indicator::ScriptType, Interval };
use crate::models::{pine_indicator::ScriptType, Interval, MarketAdjustment, SessionType};

pub mod session;
pub mod study;
pub mod models;
pub(crate) mod options;
pub mod session;
pub mod study;
pub(crate) mod utils;

#[derive(Default, Debug, Clone)]
Expand Down
14 changes: 5 additions & 9 deletions src/chart/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::models::Interval;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -50,10 +49,14 @@ pub struct GraphicDataResponse {
pub indexes: Value,
}

#[derive(Debug, Clone, Deserialize)]
#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))]
#[cfg_attr(feature = "protobuf", derive(prost::Message))]
#[derive(Clone, Deserialize)]
pub struct DataPoint {
#[cfg_attr(feature = "protobuf", prost(int64, tag = "1"))]
#[serde(rename(deserialize = "i"))]
pub index: i64,
#[cfg_attr(feature = "protobuf", prost(double, repeated, tag = "2"))]
#[serde(rename(deserialize = "v"))]
pub value: Vec<f64>,
}
Expand All @@ -67,13 +70,6 @@ pub struct ChartDataChanges {
pub zoffset: i64,
}

#[derive(Debug, Default, Clone, Serialize)]
pub struct ChartSeriesData {
pub symbol: String,
pub interval: Interval,
pub data: Vec<Vec<f64>>,
}

#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))]
#[cfg_attr(feature = "protobuf", derive(prost::Message))]
#[derive(Clone, Serialize)]
Expand Down
6 changes: 5 additions & 1 deletion src/chart/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
payload,
socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent},
utils::{gen_id, gen_session_id, symbol_init},
Result,
Error, Result,
};
use serde_json::Value;
use std::fmt::Debug;
Expand Down Expand Up @@ -453,4 +453,8 @@ impl<'a> Socket for WebSocket<'a> {
self.data_loader.handle_events(event, &message.p).await;
Ok(())
}

async fn handle_error(&mut self, error: Error) {
(self.data_loader.callbacks.on_error)(error).await;
}
}
32 changes: 15 additions & 17 deletions src/data_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ use crate::{
models::{ChartResponseData, StudyResponseData, SymbolInfo},
session::SeriesInfo,
},
error::TradingViewError,
quote::{
models::{QuoteData, QuoteValue},
utils::merge_quotes,
},
socket::TradingViewDataEvent,
Result,
Error, Result,
};
use serde::Deserialize;
use serde_json::{error, Value};
use std::{collections::HashMap, f32::consts::E};
use serde_json::Value;
use std::collections::HashMap;
use tracing::{debug, error, info, trace, warn};

#[derive(Clone, Default)]
pub struct DataLoader<'a> {
pub(crate) metadata: Metadata,
callbacks: Callbacks<'a>,
pub(crate) callbacks: Callbacks<'a>,
}

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -49,7 +50,9 @@ impl<'a> DataLoader<'a> {
.await
{
Ok(_) => (),
Err(e) => error!("{}", e),
Err(e) => {
(self.callbacks.on_error)(e).await;
}
};
}
TradingViewDataEvent::OnQuoteData => self.handle_quote_data(message).await,
Expand Down Expand Up @@ -111,24 +114,16 @@ impl<'a> DataLoader<'a> {
trace!("received v: {:?}, m: {:?}", s, message);
match message[1].get(id.as_str()) {
Some(resp_data) => {
let data: Vec<Vec<f64>> = ChartResponseData::deserialize(resp_data)?
.series
.into_iter()
.map(|point| point.value)
.collect();
// timestamp, open, high, low, close, volume
let data = ChartResponseData::deserialize(resp_data)?.series;
debug!("series data extracted: {:?}", data);
// TODO: Notify function
(self.callbacks.on_chart_data)(data).await;
(self.callbacks.on_chart_data)((s.options.clone(), data)).await;
}
None => {
debug!("receive empty data on series: {:?}", s);
}
}
}

self.handle_study_data(studies, message).await?;

Ok(())
}

Expand Down Expand Up @@ -160,11 +155,14 @@ impl<'a> DataLoader<'a> {

for q in self.metadata.quotes.values() {
debug!("quote data: {:?}", q);
// TODO: Notify function for quote data
(self.callbacks.on_quote_data)(q.to_owned()).await;
}
} else {
error!("quote data status error: {:?}", qsd);
// TODO: Notify function for quote data error
(self.callbacks.on_error)(Error::TradingViewError(
TradingViewError::QuoteDataStatusError,
))
.await;
}
}
}
8 changes: 6 additions & 2 deletions src/quote/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use crate::{
quote::ALL_QUOTE_FIELDS,
socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent},
utils::gen_session_id,
Result,
Error, Result,
};
use serde_json::Value;

#[derive(Clone)]
pub struct WebSocket<'a> {
data_loader: DataLoader<'a>,
pub(crate) data_loader: DataLoader<'a>,
socket: SocketSession,
}

Expand Down Expand Up @@ -87,4 +87,8 @@ impl<'a> Socket for WebSocket<'a> {
self.data_loader.handle_events(event, &message.p).await;
Ok(())
}

async fn handle_error(&mut self, error: Error) {
(self.data_loader.callbacks.on_error)(error).await;
}
}
4 changes: 1 addition & 3 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,5 @@ pub trait Socket {

async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()>;

async fn handle_error(&mut self, error: Error) {
error!("{}", error);
}
async fn handle_error(&mut self, error: Error);
}

0 comments on commit b05885a

Please sign in to comment.