diff --git a/e2e_test.go b/e2e_test.go new file mode 100644 index 0000000..0b899f0 --- /dev/null +++ b/e2e_test.go @@ -0,0 +1,56 @@ +package wsgateway_test + +import ( + wsgateway "github.com/AlnsV/go-crypto-ws-gateway" + "github.com/AlnsV/go-crypto-ws-gateway/pkg/model" + "github.com/caarlos0/env" + "github.com/pkg/errors" + logger "github.com/sirupsen/logrus" + "testing" + "time" +) + +type config struct { + FTXAPIKey string `env:"FTX_API_KEY"` + FTXAPISecret string `env:"FTX_API_SECRET"` +} + +func New() (*config, error) { + var cfg config + + if err := env.Parse(&cfg); err != nil { + return nil, errors.Wrap(err, "error with initializing config") + } + + return &cfg, nil +} + +func TestGateway(t *testing.T) { + cfg, _ := New() + client, err := wsgateway.BuildWSClient( + "FTX", + cfg.FTXAPIKey, + cfg.FTXAPISecret, + ) + if err != nil { + logger.Error(err) + } + + err = client.Connect() + if err != nil { + logger.Error(err) + } + + err = client.Listen( + []string{"BTC-PERP", "SOL-PERP"}, + func(trade *model.Trade) { + logger.Infoln(trade) + }, + ) + if err != nil { + logger.Error(err) + } + + time.Sleep(2 * time.Second) + client.Close() +} diff --git a/wsgateway/exchanges/ftx.go b/exchanges/ftx.go similarity index 65% rename from wsgateway/exchanges/ftx.go rename to exchanges/ftx.go index cff4bc9..ee174c9 100644 --- a/wsgateway/exchanges/ftx.go +++ b/exchanges/ftx.go @@ -5,21 +5,26 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "github.com/AlnsV/go-crypto-ws-gateway/wsgateway/internal" - "github.com/sirupsen/logrus" + "github.com/AlnsV/go-crypto-ws-gateway/internal" + "github.com/AlnsV/go-crypto-ws-gateway/pkg/model" + "github.com/AlnsV/go-crypto-ws-gateway/pkg/parse" + logger "github.com/sirupsen/logrus" "net/http" "time" ) const address = "wss://ftx.com/ws/" +var ( + timezone, _ = time.LoadLocation("UTC") +) + type FTXWSClient struct { - APIKey string + APIKey string + APISecret string ws *internal.WebsocketClient - - logger *logrus.Logger } func NewFTXWSClient(APIKey string, APISecret string) *FTXWSClient { @@ -27,7 +32,6 @@ func NewFTXWSClient(APIKey string, APISecret string) *FTXWSClient { APIKey: APIKey, APISecret: APISecret, ws: internal.NewWebsocketClient(address, true), - logger: logrus.New(), } } @@ -43,7 +47,7 @@ func (f FTXWSClient) Connect() error { if err != nil { return err } - f.logger.Infoln(response) + logger.Infoln(response) ts := time.Now().Second() loginMsg := map[string]interface{}{ @@ -57,7 +61,7 @@ func (f FTXWSClient) Connect() error { }, } - f.logger.Infoln(loginMsg) + logger.Infoln(loginMsg) err = f.ws.SendMessageJSON(loginMsg) if err != nil { @@ -75,7 +79,7 @@ func (f FTXWSClient) subscribe(pairs []string) error { "market": pair, } - f.logger.Infoln(loginMsg) + logger.Infoln(loginMsg) err = f.ws.SendMessageJSON(loginMsg) if err != nil { @@ -85,7 +89,7 @@ func (f FTXWSClient) subscribe(pairs []string) error { return err } -func (f FTXWSClient) Listen(instruments []string, receiver func(map[string]interface{})) error { +func (f FTXWSClient) Listen(instruments []string, receiver func(trade *model.Trade)) error { err := f.subscribe(instruments) if err != nil { return err @@ -97,9 +101,23 @@ func (f FTXWSClient) Listen(instruments []string, receiver func(map[string]inter for { select { case msg := <-messageContainer: - f.logger.Infoln(msg) - // TODO(JV): Unify msg format - receiver(msg) + logger.Infoln(msg) + + if trades, ok := msg["data"]; ok { + for _, rawTrade := range trades.([]interface{}) { + trade := rawTrade.(map[string]interface{}) + timestamp, _ := parse.ParseTimestamp(trade["time"].(string), timezone) + + newTrade := &model.Trade{ + Price: trade["price"].(float64), + Side: trade["side"].(string), + Size: trade["size"].(float64), + Timestamp: timestamp, + Market: msg["market"].(string), + } + receiver(newTrade) + } + } } } }() @@ -108,6 +126,5 @@ func (f FTXWSClient) Listen(instruments []string, receiver func(map[string]inter } func (f FTXWSClient) Close() { - //TODO implement me - panic("implement me") + f.ws.Close() } diff --git a/wsgateway/factory.go b/factory.go similarity index 55% rename from wsgateway/factory.go rename to factory.go index 9db2208..d8b90d1 100644 --- a/wsgateway/factory.go +++ b/factory.go @@ -2,12 +2,13 @@ package wsgateway import ( "fmt" - "github.com/AlnsV/go-crypto-ws-gateway/wsgateway/exchanges" + "github.com/AlnsV/go-crypto-ws-gateway/exchanges" + "github.com/AlnsV/go-crypto-ws-gateway/pkg/model" ) type ExchangeWSClient interface { Connect() error - Listen([]string, func(map[string]interface{})) error + Listen([]string, func(trade *model.Trade)) error Close() } @@ -15,5 +16,5 @@ func BuildWSClient(exchange, APIKey, APISecret string) (ExchangeWSClient, error) if exchange == "FTX" { return exchanges.NewFTXWSClient(APIKey, APISecret), nil } - return nil, fmt.Errorf("exchange: %s doesn't exists") + return nil, fmt.Errorf("exchange: %s doesn't exists", exchange) } diff --git a/go.mod b/go.mod index 75f7529..dffe8f2 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,11 @@ module github.com/AlnsV/go-crypto-ws-gateway - go 1.17 require ( + github.com/caarlos0/env v3.5.0+incompatible github.com/gorilla/websocket v1.4.2 + github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 ) diff --git a/go.sum b/go.sum index aecffff..c227862 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ +github.com/caarlos0/env v3.5.0+incompatible h1:Yy0UN8o9Wtr/jGHZDpCBLpNrzcFLLM2yixi/rBrKyJs= +github.com/caarlos0/env v3.5.0+incompatible/go.mod h1:tdCsowwCzMLdkqRYDlHpZCp2UooDD3MspDBjZ2AD02Y= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= diff --git a/wsgateway/internal/ws.go b/internal/ws.go similarity index 89% rename from wsgateway/internal/ws.go rename to internal/ws.go index a689adb..8efae17 100644 --- a/wsgateway/internal/ws.go +++ b/internal/ws.go @@ -2,8 +2,8 @@ package internal import ( "encoding/json" - "fmt" "github.com/gorilla/websocket" + logger "github.com/sirupsen/logrus" "log" "net/http" "net/url" @@ -65,9 +65,13 @@ func (ws *WebsocketClient) Listen(messageBuffer chan<- map[string]interface{}) { var msg map[string]interface{} err = json.Unmarshal(message, &msg) if err != nil { - log.Fatal(fmt.Errorf("parsing failure in: %s, err: %s", message, err)) + logger.Warningf("parsing failure in: %s, err: %s", message, err) } else { messageBuffer <- msg } } } + +func (ws *WebsocketClient) Close() { + ws.Close() +} diff --git a/pkg/model/trade.go b/pkg/model/trade.go new file mode 100644 index 0000000..9702508 --- /dev/null +++ b/pkg/model/trade.go @@ -0,0 +1,19 @@ +package model + +import "time" + +type Trade struct { + //Price of the settled trade + Price float64 + + // Side of trade, may be Buy or Sell + Side string + + // Size Amount of currency traded + Size float64 + + Timestamp time.Time + + // Market base-quote pair + Market string +} diff --git a/pkg/parse/timestamp.go b/pkg/parse/timestamp.go new file mode 100644 index 0000000..6de25db --- /dev/null +++ b/pkg/parse/timestamp.go @@ -0,0 +1,31 @@ +package parse + +import ( + "strings" + "time" +) + +func ParseTimestamp(timestamp string, tz *time.Location) (time.Time, error) { + date, err := time.ParseInLocation(time.RFC3339Nano, timestamp, tz) + if err != nil { + date, errTwo := time.ParseInLocation( + time.RFC3339Nano, + strings.Replace(timestamp, " ", "T", 1), + tz, + ) + if errTwo != nil { + dateStr := strings.Replace(timestamp, " ", "T", 1) + date, errThree := time.ParseInLocation( + time.RFC3339Nano, + dateStr+"+00:00", + tz, + ) + if errThree != nil { + return time.Now().UTC(), errThree + } + return date, nil + } + return date, nil + } + return date, nil +} diff --git a/pkg/parse/timestamp_test.go b/pkg/parse/timestamp_test.go new file mode 100644 index 0000000..c72a47c --- /dev/null +++ b/pkg/parse/timestamp_test.go @@ -0,0 +1,33 @@ +package parse + +import ( + "log" + "testing" + "time" +) + +var ( + timezone, _ = time.LoadLocation("UTC") +) + +func TestParseTimestamp(t *testing.T) { + date, err := ParseTimestamp("2020-03-14T12:02:05.000234", timezone) + if err != nil { + t.Log(err) + } + log.Print(date) + dateTwo, errTwo := ParseTimestamp("2020-03-14 12:02:05.000234", timezone) + if errTwo != nil { + t.Log(errTwo) + } + log.Print(dateTwo) + dateThree, errThree := ParseTimestamp("2020-03-14T12:02:05.000234Z", timezone) + if errThree != nil { + t.Log(errThree) + } + log.Print(dateThree) + _, errFour := ParseTimestamp("2020-0314 12:005.000234Z", timezone) + if errFour == nil { + t.Log("timestamp should not be parsed") + } +}