Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
Add support for web sockets in binance, outline for binanceExchange_w…
Browse files Browse the repository at this point in the history
…s.go + GetTickerPrice() (#717) (part of #715)

* feat/add ttlMap + binanceExchangeWs + binanceExchangeWs.GetTickerPrice

* patch/ remove ttlmap + log in ms + add now variable

* bugfix/ deps

* patch/resolve comments

* patch/ revert version go-jwt-middleware

* patch/ refactor Set/Get/Del

* patch/ transform waitForFirstEvent to timeToWaitForFirstEvent + fix glide.lock

* patch/ update glide.yaml

* patch/ fix version for go-binance glide.yaml

* patch/ forgot defer
  • Loading branch information
tibrn authored Jul 18, 2021
1 parent 08378e1 commit 3e8093d
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 7 deletions.
23 changes: 17 additions & 6 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ import:
- package: github.com/denisbrodbeck/machineid
version: v1.0.1
- package: github.com/google/uuid
version: v1.1.2
version: v1.1.2
- package: github.com/adshao/go-binance
version: v2.3.0
subpackages:
- v2
299 changes: 299 additions & 0 deletions plugins/binanceExchange_ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package plugins

import (
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"

"github.com/adshao/go-binance/v2"
"github.com/stellar/kelp/api"
"github.com/stellar/kelp/model"
)

const (
STREAM_TICKER_FMT = "%s@ticker"
TTLTIME = time.Second * 3 // ttl time in seconds
)

var (
timeToWaitForFirstEvent = time.Second
)

var (
ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"}
)

type errMissingSymbol struct {
symbol string
}

func (err errMissingSymbol) Error() string {
return fmt.Sprintf("Symbol %s is missing from exchange intizialization", err.symbol)
}

type errConversion struct {
from string
to string
}

func (err errConversion) Error() string {
return fmt.Sprintf("Error conversion from %s to %s", err.from, err.to)
}

type stream struct {
doneC chan struct{}
stopC chan struct{}
cleanup func()
}

//Wait until the stream ends
func (s stream) Wait() {

if s.doneC == nil {
return
}

<-s.doneC
}

//Close the stream and cleanup any data
func (s stream) Close() {
if s.stopC == nil {
return
}
s.stopC <- struct{}{}
s.stopC = nil

if s.cleanup != nil {
s.cleanup()
}
}

//mapData... struct used to data from events and timestamp when they are cached
type mapData struct {
data interface{}
createdAt time.Time
}

//isStatle... check if data it's stale
func isStale(data mapData, ttl time.Duration) bool {

return time.Now().Sub(data.createdAt).Seconds() > ttl.Seconds()
}

//struct used to cache events
type mapEvents struct {
data map[string]mapData
mtx *sync.RWMutex
}

//Set ... set value
func (m *mapEvents) Set(key string, data interface{}) {

now := time.Now()

m.mtx.Lock()
defer m.mtx.Unlock()

m.data[key] = mapData{
data: data,
createdAt: now,
}

}

//Get ... get value
func (m *mapEvents) Get(key string) (mapData, bool) {
m.mtx.RLock()
defer m.mtx.RUnlock()

data, isData := m.data[key]

return data, isData
}

//Del ... delete cached value
func (m *mapEvents) Del(key string) {
m.mtx.Lock()
defer m.mtx.Unlock()

delete(m.data, key)

}

// create new map for cache
func makeMapEvents() *mapEvents {
return &mapEvents{
data: make(map[string]mapData),
mtx: &sync.RWMutex{},
}
}

//struct used to keep all cached data
type events struct {
SymbolStats *mapEvents
}

func createStateEvents() *events {
events := &events{
SymbolStats: makeMapEvents(),
}

return events
}

// subscribe for symbol@ticker
func subcribeTicker(symbol string, state *mapEvents) (*stream, error) {

wsMarketStatHandler := func(ticker *binance.WsMarketStatEvent) {
state.Set(symbol, ticker)
}

errHandler := func(err error) {
log.Printf("Error WsMarketsStat for symbol %s: %v\n", symbol, err)
}

doneC, stopC, err := binance.WsMarketStatServe(symbol, wsMarketStatHandler, errHandler)

if err != nil {
return nil, err
}

return &stream{doneC: doneC, stopC: stopC, cleanup: func() {
state.Del(symbol)
}}, err

}

type binanceExchangeWs struct {
events *events

streams map[string]*stream
streamLock *sync.Mutex

assetConverter model.AssetConverterInterface
delimiter string
}

// makeBinanceWs is a factory method to make an binance exchange over ws
func makeBinanceWs() (*binanceExchangeWs, error) {

binance.WebsocketKeepalive = true

events := createStateEvents()

beWs := &binanceExchangeWs{
events: events,
delimiter: "",
assetConverter: model.CcxtAssetConverter,
streamLock: &sync.Mutex{},
streams: make(map[string]*stream),
}

return beWs, nil
}

//getPrceision... get precision for float string
func getPrecision(floatStr string) int8 {

strs := strings.Split(floatStr, ".")

if len(strs) != 2 {
log.Printf("could not get precision for float %s\n", floatStr)
return 0
}

return int8(len(strs[1]))
}

// GetTickerPrice impl.
func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[model.TradingPair]api.Ticker, error) {

priceResult := map[model.TradingPair]api.Ticker{}
for _, p := range pairs {

symbol, err := p.ToString(beWs.assetConverter, beWs.delimiter)

if err != nil {
return nil, err
}

tickerData, isTicker := beWs.events.SymbolStats.Get(symbol)

if !isTicker {
stream, err := subcribeTicker(symbol, beWs.events.SymbolStats)

if err != nil {
return nil, fmt.Errorf("error when subscribing for %s: %s", symbol, err)
}

//Store stream
beWs.streamLock.Lock()
beWs.streams[fmt.Sprintf(STREAM_TICKER_FMT, symbol)] = stream
beWs.streamLock.Unlock()

//Wait for binance to send events
time.Sleep(timeToWaitForFirstEvent)

tickerData, isTicker = beWs.events.SymbolStats.Get(symbol)

//We couldn't subscribe for this pair
if !isTicker {
return nil, fmt.Errorf("error while fetching ticker price for trading pair %s", symbol)
}

}

//Show how old is the ticker
log.Printf("Ticker for %s is %d milliseconds old!\n", symbol, time.Now().Sub(tickerData.createdAt).Milliseconds())

if isStale(tickerData, TTLTIME) {
return nil, fmt.Errorf("ticker for %s symbols is older than %v", symbol, TTLTIME)
}

tickerI := tickerData.data

//Convert to WsMarketStatEvent
ticker, isOk := tickerI.(*binance.WsMarketStatEvent)

if !isOk {
return nil, ErrConversionWsMarketEvent
}

askPrice, e := strconv.ParseFloat(ticker.AskPrice, 64)
if e != nil {
return nil, fmt.Errorf("unable to correctly parse 'ask': %s", e)
}
bidPrice, e := strconv.ParseFloat(ticker.BidPrice, 64)
if e != nil {
return nil, fmt.Errorf("unable to correctly parse 'bid': %s", e)
}
lastPrice, e := strconv.ParseFloat(ticker.LastPrice, 64)
if e != nil {
return nil, fmt.Errorf("unable to correctly parse 'last': %s", e)
}

priceResult[p] = api.Ticker{
AskPrice: model.NumberFromFloat(askPrice, getPrecision(ticker.AskPrice)),
BidPrice: model.NumberFromFloat(bidPrice, getPrecision(ticker.BidPrice)),
LastPrice: model.NumberFromFloat(lastPrice, getPrecision(ticker.LastPrice)),
}
}

return priceResult, nil
}

//Unsubscribe ... unsubscribe from binance streams
func (beWs *binanceExchangeWs) Unsubscribe(stream string) {

beWs.streamLock.Lock()

if stream, isStream := beWs.streams[stream]; isStream {
stream.Close()
}

beWs.streamLock.Unlock()
}
Loading

0 comments on commit 3e8093d

Please sign in to comment.