Skip to content

Commit

Permalink
Remove multierror to fix error in error handling
Browse files Browse the repository at this point in the history
multierror struct has a formatting closure, making it unserializable
Thus, multierror cannot be stored in temporal state

Replace with jsonerr, with fallback logic for non serializable errors
  • Loading branch information
serprex committed Dec 16, 2023
1 parent 4d0d18a commit 808d48f
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 17 deletions.
2 changes: 0 additions & 2 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/google/uuid v1.4.0
github.com/grafana/pyroscope-go v1.0.4
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1
github.com/hashicorp/go-multierror v1.1.1
github.com/jackc/pglogrepl v0.0.0-20231111135425-1627ab1b5780
github.com/jackc/pgx/v5 v5.5.1
github.com/jmoiron/sqlx v1.3.5
Expand Down Expand Up @@ -124,7 +123,6 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
Expand Down
5 changes: 0 additions & 5 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8k
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
Expand Down
43 changes: 43 additions & 0 deletions flow/shared/jsonerr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package shared

import "encoding/json"

type ErrString struct {
Message string
}

func (e ErrString) Error() string {
return e.Message
}

type JSONErr struct {
E error
}

func (je JSONErr) Error() string {
return je.E.Error()
}

func (je JSONErr) MarshalJSON() ([]byte, error) {
if jm, ok := je.E.(json.Marshaler); ok {
return jm.MarshalJSON()
} else {
return json.Marshal(je.E.Error())
}
}

func (je *JSONErr) UnmarshalJSON(data []byte) error {
var res error
err := json.Unmarshal(data, &res)
if err == nil {
*je = JSONErr{E: res}
return nil
}
var msg string
err = json.Unmarshal(data, &msg)
if err != nil {
return err
}
*je = JSONErr{ErrString{Message: msg}}
return nil
}
61 changes: 61 additions & 0 deletions flow/shared/jsonerr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package shared

import "encoding/json"
import "testing"

type jsonSafeErr struct {
Err string
}

func (jse jsonSafeErr) Error() string {
return jse.Err
}

type jsonUnsafeErr struct {
Err string
Fn func()
}

func (jue jsonUnsafeErr) Error() string {
return jue.Err
}

func TestJsonSafeError(t *testing.T) {
e := jsonSafeErr{Err: "test"}
je := JSONErr{E: e}
j, err := json.Marshal(je)
if err != nil {
t.Error(err)
}
t.Log(j)

var newje JSONErr
err = json.Unmarshal(j, &newje)
if err != nil {
t.Error(err)
}
errmsg := newje.Error()
if errmsg != "test" {
t.Error("Expected 'test'", errmsg)
}
}

func TestJsonUnsafeError(t *testing.T) {
e := jsonUnsafeErr{Err: "test", Fn: func() {}}
je := JSONErr{E: e}
j, err := json.Marshal(je)
if err != nil {
t.Error(err)
}
t.Log(j)

var newje JSONErr
err = json.Unmarshal(j, &newje)
if err != nil {
t.Error(err)
}
errmsg := newje.Error()
if errmsg != "test" {
t.Error("Expected 'test'", errmsg)
}
}
19 changes: 9 additions & 10 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -50,9 +49,9 @@ type CDCFlowWorkflowState struct {
// SnapshotComplete indicates whether the initial snapshot workflow has completed.
SnapshotComplete bool
// Errors encountered during child sync flow executions.
SyncFlowErrors error
SyncFlowErrors []shared.JSONErr
// Errors encountered during child sync flow executions.
NormalizeFlowErrors error
NormalizeFlowErrors []shared.JSONErr
// Global mapping of relation IDs to RelationMessages sent as a part of logical replication.
// Needed to support schema changes.
RelationMessageMapping *model.RelationMessageMapping
Expand All @@ -79,7 +78,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
}

// truncate the progress and other arrays to a max of 10 elements
func (s *CDCFlowWorkflowState) TruncateProgress() {
func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) {
if len(s.Progress) > 10 {
s.Progress = s.Progress[len(s.Progress)-10:]
}
Expand All @@ -91,12 +90,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress() {
}

if s.SyncFlowErrors != nil {
fmt.Println("SyncFlowErrors: ", s.SyncFlowErrors)
logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors)
s.SyncFlowErrors = nil
}

if s.NormalizeFlowErrors != nil {
fmt.Println("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
s.NormalizeFlowErrors = nil
}
}
Expand Down Expand Up @@ -373,7 +372,7 @@ func CDCFlowWorkflowWithConfig(
var childSyncFlowRes *model.SyncResponse
if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil {
w.logger.Error("failed to execute sync flow: ", err)
state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err)
state.SyncFlowErrors = append(state.SyncFlowErrors, shared.JSONErr{E: err})
} else {
state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes)
if childSyncFlowRes != nil {
Expand Down Expand Up @@ -426,7 +425,7 @@ func CDCFlowWorkflowWithConfig(
var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err)
state.SyncFlowErrors = append(state.SyncFlowErrors, shared.JSONErr{E: err})
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
Expand All @@ -446,7 +445,7 @@ func CDCFlowWorkflowWithConfig(
var childNormalizeFlowRes *model.NormalizeResponse
if err := f.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, shared.JSONErr{E: err})
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
Expand All @@ -455,6 +454,6 @@ func CDCFlowWorkflowWithConfig(
batchSizeSelector.Select(ctx)
}

state.TruncateProgress()
state.TruncateProgress(w.logger)
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}

0 comments on commit 808d48f

Please sign in to comment.