From b41d6fed560529d30f08e82863e0ef39d9617ecc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 16 Dec 2023 14:26:45 +0000 Subject: [PATCH] Remove multierror to fix error in error handling multierror struct has a formatting closure, making it unserializable Thus, multierror cannot be stored in temporal state Replace with jsonerr, with fallback logic for non serializable errors --- flow/go.mod | 2 -- flow/go.sum | 5 --- flow/shared/jsonerr.go | 43 ++++++++++++++++++++++++++ flow/shared/jsonerr_test.go | 61 +++++++++++++++++++++++++++++++++++++ flow/workflows/cdc_flow.go | 19 ++++++------ 5 files changed, 113 insertions(+), 17 deletions(-) create mode 100644 flow/shared/jsonerr.go create mode 100644 flow/shared/jsonerr_test.go diff --git a/flow/go.mod b/flow/go.mod index f0bb33c999..5dcb66e440 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -15,7 +15,6 @@ require ( github.com/google/uuid v1.4.0 github.com/grafana/pyroscope-go v1.0.4 github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 - github.com/hashicorp/go-multierror v1.1.1 github.com/jackc/pglogrepl v0.0.0-20231111135425-1627ab1b5780 github.com/jackc/pgx/v5 v5.5.1 github.com/jmoiron/sqlx v1.3.5 @@ -124,7 +123,6 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect - github.com/hashicorp/errwrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect diff --git a/flow/go.sum b/flow/go.sum index 977cb09638..c2145c93e2 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -241,11 +241,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8k github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= -github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= diff --git a/flow/shared/jsonerr.go b/flow/shared/jsonerr.go new file mode 100644 index 0000000000..769ab287a3 --- /dev/null +++ b/flow/shared/jsonerr.go @@ -0,0 +1,43 @@ +package shared + +import "encoding/json" + +type ErrString struct { + Message string +} + +func (e ErrString) Error() string { + return e.Message +} + +type JSONErr struct { + E error +} + +func (je JSONErr) Error() string { + return je.E.Error() +} + +func (je JSONErr) MarshalJSON() ([]byte, error) { + if jm, ok := je.E.(json.Marshaler); ok { + return jm.MarshalJSON() + } else { + return json.Marshal(je.E.Error()) + } +} + +func (je *JSONErr) UnmarshalJSON(data []byte) error { + var res error + err := json.Unmarshal(data, &res) + if err == nil { + *je = JSONErr{E: res} + return nil + } + var msg string + err = json.Unmarshal(data, &msg) + if err != nil { + return err + } + *je = JSONErr{ErrString{Message: msg}} + return nil +} diff --git a/flow/shared/jsonerr_test.go b/flow/shared/jsonerr_test.go new file mode 100644 index 0000000000..6614d6d07a --- /dev/null +++ b/flow/shared/jsonerr_test.go @@ -0,0 +1,61 @@ +package shared + +import "encoding/json" +import "testing" + +type jsonSafeErr struct { + Err string +} + +func (jse jsonSafeErr) Error() string { + return jse.Err +} + +type jsonUnsafeErr struct { + Err string + Fn func() +} + +func (jue jsonUnsafeErr) Error() string { + return jue.Err +} + +func TestJsonSafeError(t *testing.T) { + e := jsonSafeErr{Err: "test"} + je := JSONErr{E: e} + j, err := json.Marshal(je) + if err != nil { + t.Error(err) + } + t.Log(j) + + var newje JSONErr + err = json.Unmarshal(j, &newje) + if err != nil { + t.Error(err) + } + errmsg := newje.Error() + if errmsg != "test" { + t.Error("Expected 'test'", errmsg) + } +} + +func TestJsonUnsafeError(t *testing.T) { + e := jsonUnsafeErr{Err: "test", Fn: func() {}} + je := JSONErr{E: e} + j, err := json.Marshal(je) + if err != nil { + t.Error(err) + } + t.Log(j) + + var newje JSONErr + err = json.Unmarshal(j, &newje) + if err != nil { + t.Error(err) + } + errmsg := newje.Error() + if errmsg != "test" { + t.Error("Expected 'test'", errmsg) + } +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 4e0f760223..a62a99ac28 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -9,7 +9,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" - "github.com/hashicorp/go-multierror" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -50,9 +49,9 @@ type CDCFlowWorkflowState struct { // SnapshotComplete indicates whether the initial snapshot workflow has completed. SnapshotComplete bool // Errors encountered during child sync flow executions. - SyncFlowErrors error + SyncFlowErrors []shared.JSONErr // Errors encountered during child sync flow executions. - NormalizeFlowErrors error + NormalizeFlowErrors []shared.JSONErr // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. // Needed to support schema changes. RelationMessageMapping *model.RelationMessageMapping @@ -79,7 +78,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { } // truncate the progress and other arrays to a max of 10 elements -func (s *CDCFlowWorkflowState) TruncateProgress() { +func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) { if len(s.Progress) > 10 { s.Progress = s.Progress[len(s.Progress)-10:] } @@ -91,12 +90,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress() { } if s.SyncFlowErrors != nil { - fmt.Println("SyncFlowErrors: ", s.SyncFlowErrors) + logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors) s.SyncFlowErrors = nil } if s.NormalizeFlowErrors != nil { - fmt.Println("NormalizeFlowErrors: ", s.NormalizeFlowErrors) + logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors) s.NormalizeFlowErrors = nil } } @@ -373,7 +372,7 @@ func CDCFlowWorkflowWithConfig( var childSyncFlowRes *model.SyncResponse if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil { w.logger.Error("failed to execute sync flow: ", err) - state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err) + state.SyncFlowErrors = append(state.SyncFlowErrors, shared.JSONErr{E: err}) } else { state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) if childSyncFlowRes != nil { @@ -426,7 +425,7 @@ func CDCFlowWorkflowWithConfig( var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { w.logger.Error("failed to execute schema update at source: ", err) - state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err) + state.SyncFlowErrors = append(state.SyncFlowErrors, shared.JSONErr{E: err}) } else { for i := range modifiedSrcTables { cfg.TableNameSchemaMapping[modifiedDstTables[i]] = @@ -446,7 +445,7 @@ func CDCFlowWorkflowWithConfig( var childNormalizeFlowRes *model.NormalizeResponse if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, shared.JSONErr{E: err}) } else { state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) } @@ -455,6 +454,6 @@ func CDCFlowWorkflowWithConfig( batchSizeSelector.Select(ctx) } - state.TruncateProgress() + state.TruncateProgress(w.logger) return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) }