Skip to content

Commit

Permalink
Merge pull request #113 from AM-979/patch-1
Browse files Browse the repository at this point in the history
Add more exchange from xeggex
  • Loading branch information
yuriy0803 authored Nov 29, 2024
2 parents bc4b149 + 56a1a6a commit 1b44995
Showing 1 changed file with 76 additions and 14 deletions.
90 changes: 76 additions & 14 deletions exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ type RestClient struct {
}

type ExchangeReply1 map[string]interface{}

type ExchangeReply []map[string]interface{}

// NewRestClient creates a new RestClient
func NewRestClient(name, url, timeout string) *RestClient {
restClient := &RestClient{Name: name, Url: url}
timeoutIntv := util.MustParseDuration(timeout)
Expand All @@ -50,38 +50,97 @@ func NewRestClient(name, url, timeout string) *RestClient {
return restClient
}

func (r *RestClient) GetData() (ExchangeReply, error) {
// GetData fetches the data from the given URL and processes it
func (r *RestClient) GetData() ([]map[string]interface{}, error) {
// If the name is "xeggex", use the xeggex-specific code
if r.Name == "xeggex" {
resp, err := r.doPost(r.Url, "ticker") // Use the URL specified in RestClient
if err != nil {
return nil, err
}

// Data structure for the new API response
var data map[string]interface{}
err = json.Unmarshal(resp, &data)
if err != nil {
log.Printf("Failed to parse response: %v", err)
return nil, err
}

// Extract only base_currency and last_price
result := []map[string]interface{}{
{
"symbol": data["base_currency"].(string), // symbol = base_currency
"current_price": data["last_price"].(string), // price = last_price
},
}
return result, nil
}

// If the name is "coingecko", use the coingecko-specific code
resp, err := r.doPost(r.Url, "ticker")
if err != nil {
return nil, err
}

// Attempt to interpret the response as a slice of maps
// Data structure for the new API response
var data ExchangeReply
err = json.Unmarshal(resp, &data)
if err == nil {
return data, nil
}

// If interpreting as a slice of maps fails, try to interpret it as a single map
// If decoding as a slice of maps failed, try decoding as a single map
var dataSingle ExchangeReply1
err = json.Unmarshal(resp, &dataSingle)
if err == nil {
// Convert the single map into a slice of maps with only one entry
// Convert from single map to a slice of maps with just one entry
data = ExchangeReply{dataSingle}
return data, nil
}

return nil, err
}

// StartExchangeProcessor starts the exchange processor
func StartExchangeProcessor(cfg *ExchangeConfig, backend *storage.RedisClient) *ExchangeProcessor {
u := &ExchangeProcessor{ExchangeConfig: cfg, backend: backend}
u.rpc = NewRestClient("ExchangeProcessor", cfg.Url, cfg.Timeout)
var u *ExchangeProcessor

// If ExchangeConfig.Name is not specified, set it to "coingecko" as the default
if cfg.Name == "" {
cfg.Name = "coingecko"
}

// Check the value of ExchangeConfig.Name
if cfg.Name == "xeggex" {
// If it's xeggex, use the xeggex-specific code
u = &ExchangeProcessor{ExchangeConfig: cfg, backend: backend}
u.rpc = NewRestClient("xeggex", cfg.Url, cfg.Timeout)
} else if cfg.Name == "coingecko" {
// If it's coingecko, use the coingecko-specific code
u = &ExchangeProcessor{ExchangeConfig: cfg, backend: backend}
u.rpc = NewRestClient("coingecko", cfg.Url, cfg.Timeout)
} else {
log.Printf("Unsupported exchange: %s", cfg.Name)
// Print a message if ExchangeConfig.Name is not supported
return nil
}

// Check if u is nil before calling Start
if u == nil {
log.Printf("Failed to initialize ExchangeProcessor")
return nil
}
return u
}

// Start begins the periodic fetching of data
func (u *ExchangeProcessor) Start() {
if u == nil {
log.Printf("ExchangeProcessor is not initialized.")
return
}

refreshIntv := util.MustParseDuration(u.ExchangeConfig.RefreshInterval)
refreshTimer := time.NewTimer(refreshIntv)
log.Printf("Set Exchange data refresh every %v", refreshIntv)
Expand All @@ -100,24 +159,27 @@ func (u *ExchangeProcessor) Start() {
}()
}

// fetchData fetches data from the exchange and stores it
func (u *ExchangeProcessor) fetchData() {
reply, err := u.rpc.GetData()

if err != nil {
log.Printf("Failed to fetch data from exchange %v", err)
if u == nil {
log.Printf("ExchangeProcessor is not initialized.")
return
}

u.backend.StoreExchangeData(reply)
reply, err := u.rpc.GetData()

if err != nil {
log.Printf("Failed to store the data to exchange %v", err)
log.Printf("Failed to fetch data from exchange: %v", err)
return
}

return
// Send the data to StoreExchangeData directly
u.backend.StoreExchangeData(reply)

log.Printf("Exchange data fetched and stored successfully.")
}

// doPost sends an HTTP GET request
func (r *RestClient) doPost(url string, method string) ([]byte, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
Expand Down

0 comments on commit 1b44995

Please sign in to comment.