Skip to content
This repository has been archived by the owner on Sep 12, 2019. It is now read-only.

Commit

Permalink
Merge pull request #68 from stellar/fix-listener-error-loop
Browse files Browse the repository at this point in the history
Fix error loop in PaymentListener
  • Loading branch information
bartekn authored Aug 6, 2017
2 parents a6012ad + a350a69 commit 1b8b88d
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 82 deletions.
8 changes: 4 additions & 4 deletions src/github.com/stellar/gateway/bridge/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package config

import (
"errors"
"net/url"
"github.com/stellar/go/keypair"
"net/url"
"regexp"
)

Expand Down Expand Up @@ -71,15 +71,15 @@ func (c *Config) Validate() (err error) {
for _, asset := range c.Assets {
if asset.Issuer == "" {
if asset.Code != "XLM" {
err = errors.New("Issuer param is required for "+asset.Code)
err = errors.New("Issuer param is required for " + asset.Code)
return
}
}

if asset.Issuer != "" {
_, err = keypair.Parse(asset.Issuer)
if err != nil {
err = errors.New("Issuing account is invalid for "+asset.Code)
err = errors.New("Issuing account is invalid for " + asset.Code)
return
}
}
Expand All @@ -90,7 +90,7 @@ func (c *Config) Validate() (err error) {
}

if !matched {
err = errors.New("Invalid asset code: "+asset.Code)
err = errors.New("Invalid asset code: " + asset.Code)
return err
}
}
Expand Down
89 changes: 47 additions & 42 deletions src/github.com/stellar/gateway/listener/payment_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,25 @@ func (pl *PaymentListener) ReprocessPayment(payment horizon.PaymentResponse, for
return errors.New("Trying to reprocess successful transaction without force")
}

existingPayment.Status = "Reprocessing..."
existingPayment.ProcessedAt = pl.now()

return pl.process(payment, existingPayment)
err = pl.entityManager.Persist(existingPayment)
if err != nil {
return err
}

err = pl.process(payment)

if err != nil {
pl.log.WithFields(logrus.Fields{"err": err}).Error("Payment reprocessed with errors")
existingPayment.Status = err.Error()
} else {
pl.log.Info("Payment successfully reprocessed")
existingPayment.Status = "Success"
}

return pl.entityManager.Persist(existingPayment)
}

func (pl *PaymentListener) onPayment(payment horizon.PaymentResponse) (err error) {
Expand All @@ -166,40 +182,43 @@ func (pl *PaymentListener) onPayment(payment horizon.PaymentResponse) (err error
OperationID: payment.ID,
ProcessedAt: pl.now(),
PagingToken: payment.PagingToken,
Status: "Processing...",
}

return pl.process(payment, dbPayment)
}

func (pl *PaymentListener) process(payment horizon.PaymentResponse, dbPayment *entities.ReceivedPayment) (err error) {
savePayment := func(payment *entities.ReceivedPayment) (err error) {
pl.log.Info(payment.Status)
err = pl.entityManager.Persist(payment)
err = pl.entityManager.Persist(dbPayment)
if err != nil {
return
}

err = pl.process(payment)

if err != nil {
pl.log.WithFields(logrus.Fields{"err": err}).Error("Payment processed with errors")
dbPayment.Status = err.Error()
} else {
pl.log.Info("Payment successfully processed")
dbPayment.Status = "Success"
}

return pl.entityManager.Persist(dbPayment)
}

func (pl *PaymentListener) process(payment horizon.PaymentResponse) error {
if payment.Type != "payment" && payment.Type != "path_payment" {
dbPayment.Status = "Not a payment operation"
savePayment(dbPayment)
return
return errors.New("Not a payment operation")
}

if payment.To != pl.config.Accounts.ReceivingAccountID {
dbPayment.Status = "Operation sent not received"
savePayment(dbPayment)
return nil
return errors.New("Operation sent not received")
}

if !pl.isAssetAllowed(payment.AssetType, payment.AssetCode, payment.AssetIssuer) {
dbPayment.Status = "Asset not allowed"
savePayment(dbPayment)
return nil
return errors.New("Asset not allowed")
}

err = pl.horizon.LoadMemo(&payment)
err := pl.horizon.LoadMemo(&payment)
if err != nil {
pl.log.Error("Unable to load transaction memo")
return err
return errors.Wrap(err, "Unable to load transaction memo")
}

pl.log.WithFields(logrus.Fields{"memo": payment.Memo.Value, "type": payment.Memo.Type}).Info("Loaded memo")
Expand All @@ -215,43 +234,38 @@ func (pl *PaymentListener) process(payment horizon.PaymentResponse, dbPayment *e
pl.log.WithFields(logrus.Fields{"url": complianceRequestURL, "body": complianceRequestBody}).Error("Sending request to compliance server")
resp, err := pl.postForm(complianceRequestURL, complianceRequestBody)
if err != nil {
pl.log.WithFields(logrus.Fields{"err": err}).Error("Error sending request to compliance server")
return err
return errors.Wrap(err, "Error sending request to compliance server")
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
pl.log.Error("Error reading compliance server response")
return err
return errors.Wrap(err, "Error reading compliance server response")
}

if resp.StatusCode != 200 {
pl.log.WithFields(logrus.Fields{
"status": resp.StatusCode,
"body": string(body),
}).Error("Error response from compliance server")
return err
return errors.New("Error response from compliance server")
}

err = json.Unmarshal([]byte(body), &receiveResponse)
if err != nil {
pl.log.WithFields(logrus.Fields{"err": err}).Error("Cannot unmarshal receiveResponse")
return err
return errors.Wrap(err, "Cannot unmarshal receiveResponse")
}

var authData compliance.AuthData
err = json.Unmarshal([]byte(receiveResponse.Data), &authData)
if err != nil {
pl.log.WithFields(logrus.Fields{"err": err}).Error("Cannot unmarshal authData")
return err
return errors.Wrap(err, "Cannot unmarshal authData")
}

var attachment compliance.Attachment
err = json.Unmarshal([]byte(authData.AttachmentJSON), &attachment)
if err != nil {
pl.log.WithFields(logrus.Fields{"err": err}).Error("Cannot unmarshal memo")
return err
return errors.Wrap(err, "Cannot unmarshal memo")
}

route = attachment.Transaction.Route
Expand All @@ -274,16 +288,14 @@ func (pl *PaymentListener) process(payment horizon.PaymentResponse, dbPayment *e
},
)
if err != nil {
pl.log.Error("Error sending request to receive callback")
return err
return errors.Wrap(err, "Error sending request to receive callback")
}

if resp.StatusCode != 200 {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
pl.log.Error("Error reading receive callback response")
return err
return errors.Wrap(err, "Error reading receive callback response")
}

pl.log.WithFields(logrus.Fields{
Expand All @@ -293,13 +305,6 @@ func (pl *PaymentListener) process(payment horizon.PaymentResponse, dbPayment *e
return errors.New("Error response from receive callback")
}

dbPayment.Status = "Success"
err = savePayment(dbPayment)
if err != nil {
pl.log.Error("Error saving payment to the DB")
return err
}

return nil
}

Expand Down
Loading

0 comments on commit 1b8b88d

Please sign in to comment.