Skip to content

Commit

Permalink
update dataloader chart and quote
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenthdat committed Jan 11, 2024
1 parent 17ea3d6 commit 36f28f4
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 122 deletions.
48 changes: 23 additions & 25 deletions examples/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,44 @@ async fn main() -> anyhow::Result<()> {
let publisher: DataLoader = DataLoader::default();

let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone());
// let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone());
let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone());

// subscriber.subscribe(&mut chart, &mut socket);

// let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100);
let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100);
let opts2 = ChartOptions::new("BINANCE:BTCUSDT", Interval::Daily)
.bar_count(1)
.study_config(
"STD;Candlestick%1Pattern%1Bearish%1Abandoned%1Baby",
"33.0",
ScriptType::IntervalScript,
);
// let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour);
// .replay_mode(true)
// .replay_from(1698624060);
let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour)
.replay_mode(true)
.replay_from(1698624060);

chart
// .set_market(opts)
// .await?
.set_market(opts)
.await?
.set_market(opts2)
.await?
.set_market(opts3)
.await?;
// .set_market(opts3)
// .await?;

// quote
// .create_session()
// .await?
// .set_fields()
// .await?
// .add_symbols(vec![
// "SP:SPX",
// "BINANCE:BTCUSDT",
// "BINANCE:ETHUSDT",
// "BITSTAMP:ETHUSD",
// "NASDAQ:TSLA", // "BINANCE:B",
// ])
// .await?;
quote
.create_session()
.await?
.set_fields()
.await?
.add_symbols(vec![
"SP:SPX",
"BINANCE:BTCUSDT",
"BINANCE:ETHUSDT",
"BITSTAMP:ETHUSD",
"NASDAQ:TSLA", // "BINANCE:B",
])
.await?;

tokio::spawn(async move { chart.clone().subscribe().await });
// tokio::spawn(async move { quote.clone().subscribe().await });
tokio::spawn(async move { quote.clone().subscribe().await });

loop {}
}
60 changes: 35 additions & 25 deletions src/callback.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,50 @@
use crate::{
chart::{
models::{DataPoint, SeriesCompletedMessage, StudyResponseData},
models::{DataPoint, StudyResponseData, SymbolInfo},
ChartOptions, StudyOptions,
},
quote::models::QuoteValue,
socket::TradingViewDataEvent,
Error,
};
use futures_util::{future::BoxFuture, Future};
use serde_json::Value;
use std::sync::Arc;
use tracing::error;
use tracing::{error, info};

pub type AsyncCallback<'a, T> = Box<dyn (Fn(T) -> BoxFuture<'a, ()>) + Send + Sync + 'a>;

#[derive(Clone)]
pub struct Callbacks<'a> {
pub(crate) on_chart_data: Arc<AsyncCallback<'a, (ChartOptions, Vec<DataPoint>)>>,
pub(crate) on_quote_data: Arc<AsyncCallback<'a, QuoteValue>>,
pub(crate) on_series_completed: Arc<AsyncCallback<'a, SeriesCompletedMessage>>,
pub(crate) on_study_data: Arc<AsyncCallback<'a, (StudyOptions, StudyResponseData)>>,
pub(crate) on_unknown_event: Arc<AsyncCallback<'a, Value>>,
pub(crate) on_error: Arc<AsyncCallback<'a, Error>>,
pub(crate) on_symbol_info: Arc<AsyncCallback<'a, SymbolInfo>>,
pub(crate) on_other_event: Arc<AsyncCallback<'a, (TradingViewDataEvent, Vec<Value>)>>,
}

impl Default for Callbacks<'_> {
fn default() -> Self {
Self {
on_chart_data: Arc::new(Box::new(|_| Box::pin(async {}))),
on_quote_data: Arc::new(Box::new(|_| Box::pin(async {}))),
on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))),
on_study_data: Arc::new(Box::new(|_| Box::pin(async {}))),
on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))),
on_chart_data: Arc::new(Box::new(|data| {
Box::pin(async move { info!("default callback logging && handling: {:?}", data) })
})),
on_quote_data: Arc::new(Box::new(|data| {
Box::pin(async move { info!("default callback logging && handling: {:?}", data) })
})),
on_study_data: Arc::new(Box::new(|data| {
Box::pin(async move { info!("default callback logging && handling: {:?}", data) })
})),

on_error: Arc::new(Box::new(|e| {
Box::pin(
async move { error!("default error callback logging && handling, error: {e}") },
)
Box::pin(async move { error!("default callback logging && handling: {e}") })
})),
on_symbol_info: Arc::new(Box::new(|data| {
Box::pin(async move { info!("default callback logging && handling: {:?}", data) })
})),
on_other_event: Arc::new(Box::new(|data| {
Box::pin(async move { info!("default callback logging && handling: {:?}", data) })
})),
}
}
Expand Down Expand Up @@ -63,44 +73,44 @@ impl<'a> Callbacks<'a> {
self
}

pub fn on_series_completed<Fut>(
pub fn on_study_data<Fut>(
&mut self,
f: impl Fn(SeriesCompletedMessage) -> Fut + Send + Sync + 'a,
f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a,
) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
{
self.on_series_completed = Arc::new(Box::new(move |data| Box::pin(f(data))));
self.on_study_data = Arc::new(Box::new(move |data| Box::pin(f(data))));
self
}

pub fn on_study_data<Fut>(
&mut self,
f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a,
) -> &mut Self
pub fn on_error<Fut>(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
{
self.on_study_data = Arc::new(Box::new(move |data| Box::pin(f(data))));
self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data))));
self
}

pub fn on_unknown_event<Fut>(
pub fn on_symbol_info<Fut>(
&mut self,
f: impl Fn(Value) -> Fut + Send + Sync + 'a,
f: impl Fn(SymbolInfo) -> Fut + Send + Sync + 'a,
) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
{
self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data))));
self.on_symbol_info = Arc::new(Box::new(move |data| Box::pin(f(data))));
self
}

pub fn on_error<Fut>(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self
pub fn on_other_event<Fut>(
&mut self,
f: impl Fn((TradingViewDataEvent, Vec<Value>)) -> Fut + Send + Sync + 'a,
) -> &mut Self
where
Fut: Future<Output = ()> + Send + 'a,
{
self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data))));
self.on_other_event = Arc::new(Box::new(move |data| Box::pin(f(data))));
self
}
}
8 changes: 1 addition & 7 deletions src/chart/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,14 @@ pub struct ChartDataChanges {

#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))]
#[cfg_attr(feature = "protobuf", derive(prost::Message))]
#[derive(Clone, PartialEq, Serialize, Deserialize, Hash)]
#[derive(Clone, PartialEq, Serialize, Hash)]
pub struct SeriesCompletedMessage {
#[cfg_attr(feature = "protobuf", prost(string, tag = "1"))]
#[serde(default)]
pub id: String,
#[cfg_attr(feature = "protobuf", prost(string, tag = "2"))]
#[serde(default)]
pub session: String,
#[cfg_attr(feature = "protobuf", prost(string, tag = "3"))]
#[serde(default)]
pub update_mode: String,
#[cfg_attr(feature = "protobuf", prost(string, tag = "4"))]
#[serde(default)]
pub version: String,
}

#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))]
Expand Down
26 changes: 0 additions & 26 deletions src/chart/utils.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1 @@
use serde::{Deserialize, Deserializer};

pub fn deserialize_string_default<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer).unwrap_or_default();
Ok(s)
}

pub fn deserialize_vec_default<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
let s: Vec<T> = Deserialize::deserialize(deserializer).unwrap_or_default();
Ok(s)
}

pub fn deserialize_option_default<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
let s: Option<T> = Deserialize::deserialize(deserializer).unwrap_or_default();
Ok(s)
}
51 changes: 12 additions & 39 deletions src/data_loader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
callback::Callbacks,
chart::{
models::{ChartResponseData, SeriesCompletedMessage, StudyResponseData, SymbolInfo},
models::{ChartResponseData, StudyResponseData, SymbolInfo},
session::SeriesInfo,
StudyOptions,
},
Expand All @@ -16,7 +16,7 @@ use crate::{
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use tracing::{debug, error, info, trace};
use tracing::{debug, error, trace};

#[derive(Clone, Default)]
pub struct DataLoader<'a> {
Expand Down Expand Up @@ -52,55 +52,28 @@ impl<'a> DataLoader<'a> {
{
Ok(_) => (),
Err(e) => {
error!("chart data parsing error: {:?}", e);
(self.callbacks.on_error)(e).await;
}
};
}
TradingViewDataEvent::OnQuoteData => self.handle_quote_data(message).await,
TradingViewDataEvent::OnQuoteCompleted => {
info!("quote completed: {:?}", message)
}
TradingViewDataEvent::OnSeriesLoading => {
trace!("series is loading: {:#?}", message);
}
TradingViewDataEvent::OnSeriesCompleted => {
match SeriesCompletedMessage::deserialize(&message[1]) {
Ok(s) => info!("series completed: {:#?}", s),
Err(e) => {
error!("{:?}", e);
// return SymbolInfo::default();
}
};
}
TradingViewDataEvent::OnSymbolResolved => {
match SymbolInfo::deserialize(&message[2]) {
Ok(s) => debug!("{:?}", s),
Ok(s) => {
debug!("receive symbol info: {:?}", s);
(self.callbacks.on_symbol_info)(s).await;
}
Err(e) => {
error!("{:?}", e);
// return SymbolInfo::default();
error!("symbol resolved parsing error: {:?}", e);
(self.callbacks.on_error)(Error::JsonParseError(e)).await;
}
};
// info!("symbol resolved: {:?}", &message[2]);
// debug!("{:?}", symbol_info)
// let symbol_info = serde_json::from_value::<SymbolInfo>(message[2].clone())?;
}
TradingViewDataEvent::OnReplayOk => {
info!("replay ok: {:?}", message);
}
TradingViewDataEvent::OnReplayPoint => {
info!("replay point: {:?}", message);
}
TradingViewDataEvent::OnReplayInstanceId => {
info!("replay instance id: {:?}", message);
}
TradingViewDataEvent::OnReplayResolutions => todo!("8"),
TradingViewDataEvent::OnReplayDataEnd => todo!("9"),
TradingViewDataEvent::OnStudyLoading => todo!("10"),
TradingViewDataEvent::OnStudyCompleted => {
info!("study completed: {:?}", message);
_ => {
debug!("event: {:?}, message: {:?}", event, message);
(self.callbacks.on_other_event)((event, message.to_owned())).await;
}
TradingViewDataEvent::OnError(e) => error!("error: {:?}", e),
TradingViewDataEvent::UnknownEvent(_) => todo!("13"),
}
}

Expand Down

0 comments on commit 36f28f4

Please sign in to comment.