diff --git a/examples/all.rs b/examples/all.rs index dd7bd2b..0040580 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -21,11 +21,9 @@ async fn main() -> anyhow::Result<()> { let publisher: DataLoader = DataLoader::default(); let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone()); - // let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); + let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); - // subscriber.subscribe(&mut chart, &mut socket); - - // let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100); + let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100); let opts2 = ChartOptions::new("BINANCE:BTCUSDT", Interval::Daily) .bar_count(1) .study_config( @@ -33,34 +31,34 @@ async fn main() -> anyhow::Result<()> { "33.0", ScriptType::IntervalScript, ); - // let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour); - // .replay_mode(true) - // .replay_from(1698624060); + let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour) + .replay_mode(true) + .replay_from(1698624060); chart - // .set_market(opts) - // .await? + .set_market(opts) + .await? .set_market(opts2) + .await? + .set_market(opts3) .await?; - // .set_market(opts3) - // .await?; - // quote - // .create_session() - // .await? - // .set_fields() - // .await? - // .add_symbols(vec![ - // "SP:SPX", - // "BINANCE:BTCUSDT", - // "BINANCE:ETHUSDT", - // "BITSTAMP:ETHUSD", - // "NASDAQ:TSLA", // "BINANCE:B", - // ]) - // .await?; + quote + .create_session() + .await? + .set_fields() + .await? + .add_symbols(vec![ + "SP:SPX", + "BINANCE:BTCUSDT", + "BINANCE:ETHUSDT", + "BITSTAMP:ETHUSD", + "NASDAQ:TSLA", // "BINANCE:B", + ]) + .await?; tokio::spawn(async move { chart.clone().subscribe().await }); - // tokio::spawn(async move { quote.clone().subscribe().await }); + tokio::spawn(async move { quote.clone().subscribe().await }); loop {} } diff --git a/src/callback.rs b/src/callback.rs index cc69d80..0b37523 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -1,15 +1,16 @@ use crate::{ chart::{ - models::{DataPoint, SeriesCompletedMessage, StudyResponseData}, + models::{DataPoint, StudyResponseData, SymbolInfo}, ChartOptions, StudyOptions, }, quote::models::QuoteValue, + socket::TradingViewDataEvent, Error, }; use futures_util::{future::BoxFuture, Future}; use serde_json::Value; use std::sync::Arc; -use tracing::error; +use tracing::{error, info}; pub type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; @@ -17,24 +18,33 @@ pub type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sy pub struct Callbacks<'a> { pub(crate) on_chart_data: Arc)>>, pub(crate) on_quote_data: Arc>, - pub(crate) on_series_completed: Arc>, pub(crate) on_study_data: Arc>, - pub(crate) on_unknown_event: Arc>, pub(crate) on_error: Arc>, + pub(crate) on_symbol_info: Arc>, + pub(crate) on_other_event: Arc)>>, } impl Default for Callbacks<'_> { fn default() -> Self { Self { - on_chart_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_quote_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), - on_study_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), + on_chart_data: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_quote_data: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_study_data: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_error: Arc::new(Box::new(|e| { - Box::pin( - async move { error!("default error callback logging && handling, error: {e}") }, - ) + Box::pin(async move { error!("default callback logging && handling: {e}") }) + })), + on_symbol_info: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_other_event: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) })), } } @@ -63,44 +73,44 @@ impl<'a> Callbacks<'a> { self } - pub fn on_series_completed( + pub fn on_study_data( &mut self, - f: impl Fn(SeriesCompletedMessage) -> Fut + Send + Sync + 'a, + f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, { - self.on_series_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_study_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } - pub fn on_study_data( - &mut self, - f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a, - ) -> &mut Self + pub fn on_error(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self where Fut: Future + Send + 'a, { - self.on_study_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } - pub fn on_unknown_event( + pub fn on_symbol_info( &mut self, - f: impl Fn(Value) -> Fut + Send + Sync + 'a, + f: impl Fn(SymbolInfo) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, { - self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_symbol_info = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } - pub fn on_error(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self + pub fn on_other_event( + &mut self, + f: impl Fn((TradingViewDataEvent, Vec)) -> Fut + Send + Sync + 'a, + ) -> &mut Self where Fut: Future + Send + 'a, { - self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_other_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } } diff --git a/src/chart/models.rs b/src/chart/models.rs index 9c8ab3e..299660e 100644 --- a/src/chart/models.rs +++ b/src/chart/models.rs @@ -72,20 +72,14 @@ pub struct ChartDataChanges { #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] #[cfg_attr(feature = "protobuf", derive(prost::Message))] -#[derive(Clone, PartialEq, Serialize, Deserialize, Hash)] +#[derive(Clone, PartialEq, Serialize, Hash)] pub struct SeriesCompletedMessage { #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] #[serde(default)] pub id: String, #[cfg_attr(feature = "protobuf", prost(string, tag = "2"))] #[serde(default)] - pub session: String, - #[cfg_attr(feature = "protobuf", prost(string, tag = "3"))] - #[serde(default)] pub update_mode: String, - #[cfg_attr(feature = "protobuf", prost(string, tag = "4"))] - #[serde(default)] - pub version: String, } #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] diff --git a/src/chart/utils.rs b/src/chart/utils.rs index c149f3c..8b13789 100644 --- a/src/chart/utils.rs +++ b/src/chart/utils.rs @@ -1,27 +1 @@ -use serde::{Deserialize, Deserializer}; -pub fn deserialize_string_default<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s: String = Deserialize::deserialize(deserializer).unwrap_or_default(); - Ok(s) -} - -pub fn deserialize_vec_default<'de, D, T>(deserializer: D) -> Result, D::Error> -where - T: Deserialize<'de>, - D: Deserializer<'de>, -{ - let s: Vec = Deserialize::deserialize(deserializer).unwrap_or_default(); - Ok(s) -} - -pub fn deserialize_option_default<'de, D, T>(deserializer: D) -> Result, D::Error> -where - T: Deserialize<'de>, - D: Deserializer<'de>, -{ - let s: Option = Deserialize::deserialize(deserializer).unwrap_or_default(); - Ok(s) -} diff --git a/src/data_loader.rs b/src/data_loader.rs index 5a047f8..95bafd7 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -1,7 +1,7 @@ use crate::{ callback::Callbacks, chart::{ - models::{ChartResponseData, SeriesCompletedMessage, StudyResponseData, SymbolInfo}, + models::{ChartResponseData, StudyResponseData, SymbolInfo}, session::SeriesInfo, StudyOptions, }, @@ -16,7 +16,7 @@ use crate::{ use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, trace}; #[derive(Clone, Default)] pub struct DataLoader<'a> { @@ -52,55 +52,28 @@ impl<'a> DataLoader<'a> { { Ok(_) => (), Err(e) => { + error!("chart data parsing error: {:?}", e); (self.callbacks.on_error)(e).await; } }; } TradingViewDataEvent::OnQuoteData => self.handle_quote_data(message).await, - TradingViewDataEvent::OnQuoteCompleted => { - info!("quote completed: {:?}", message) - } - TradingViewDataEvent::OnSeriesLoading => { - trace!("series is loading: {:#?}", message); - } - TradingViewDataEvent::OnSeriesCompleted => { - match SeriesCompletedMessage::deserialize(&message[1]) { - Ok(s) => info!("series completed: {:#?}", s), - Err(e) => { - error!("{:?}", e); - // return SymbolInfo::default(); - } - }; - } TradingViewDataEvent::OnSymbolResolved => { match SymbolInfo::deserialize(&message[2]) { - Ok(s) => debug!("{:?}", s), + Ok(s) => { + debug!("receive symbol info: {:?}", s); + (self.callbacks.on_symbol_info)(s).await; + } Err(e) => { - error!("{:?}", e); - // return SymbolInfo::default(); + error!("symbol resolved parsing error: {:?}", e); + (self.callbacks.on_error)(Error::JsonParseError(e)).await; } }; - // info!("symbol resolved: {:?}", &message[2]); - // debug!("{:?}", symbol_info) - // let symbol_info = serde_json::from_value::(message[2].clone())?; - } - TradingViewDataEvent::OnReplayOk => { - info!("replay ok: {:?}", message); - } - TradingViewDataEvent::OnReplayPoint => { - info!("replay point: {:?}", message); - } - TradingViewDataEvent::OnReplayInstanceId => { - info!("replay instance id: {:?}", message); } - TradingViewDataEvent::OnReplayResolutions => todo!("8"), - TradingViewDataEvent::OnReplayDataEnd => todo!("9"), - TradingViewDataEvent::OnStudyLoading => todo!("10"), - TradingViewDataEvent::OnStudyCompleted => { - info!("study completed: {:?}", message); + _ => { + debug!("event: {:?}, message: {:?}", event, message); + (self.callbacks.on_other_event)((event, message.to_owned())).await; } - TradingViewDataEvent::OnError(e) => error!("error: {:?}", e), - TradingViewDataEvent::UnknownEvent(_) => todo!("13"), } }