From f14ed8409cf221de10a5a5626abc7f39be80eaf8 Mon Sep 17 00:00:00 2001 From: didil <1284255+didil@users.noreply.github.com> Date: Thu, 27 Jun 2024 19:58:31 +0100 Subject: [PATCH] json lua transform wip --- cmd/api/main.go | 2 + pkg/models/inhooks_config.go | 32 ++++- pkg/models/inhooks_config_test.go | 123 ++++++++++++++++++ pkg/models/sink.go | 2 + pkg/models/transform.go | 21 +++ pkg/services/inhooks_config_service.go | 39 +++++- pkg/services/message_processor.go | 10 +- pkg/services/message_processor_test.go | 8 +- pkg/services/payload_transformer.go | 22 ++++ pkg/supervisor/ready.go | 14 +- pkg/supervisor/ready_test.go | 65 ++++++++- pkg/supervisor/supervisor.go | 7 + pkg/testsupport/mocks/gen_mocks.sh | 1 + .../mocks/mock_inhooks_config_service.go | 14 ++ .../mocks/mock_message_processor.go | 8 +- .../mocks/mock_payload_transformer.go | 51 ++++++++ 16 files changed, 395 insertions(+), 24 deletions(-) create mode 100644 pkg/models/transform.go create mode 100644 pkg/services/payload_transformer.go create mode 100644 pkg/testsupport/mocks/mock_payload_transformer.go diff --git a/cmd/api/main.go b/cmd/api/main.go index 83b24e8..bddf26f 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -108,6 +108,7 @@ func main() { } cleanupSvc := services.NewCleanupService(redisStore, timeSvc) + payloadTransformer := services.NewPayloadTransformer() svisor := supervisor.NewSupervisor( supervisor.WithLogger(logger), @@ -119,6 +120,7 @@ func main() { supervisor.WithSchedulerService(schedulerSvc), supervisor.WithProcessingRecoveryService(processingRecoverySvc), supervisor.WithCleanupService(cleanupSvc), + supervisor.WithPayloadTransformer(payloadTransformer), ) wg.Add(1) diff --git a/pkg/models/inhooks_config.go b/pkg/models/inhooks_config.go index 49424c6..87c5473 100644 --- a/pkg/models/inhooks_config.go +++ b/pkg/models/inhooks_config.go @@ -10,7 +10,8 @@ import ( ) type InhooksConfig struct { - Flows []*Flow `yaml:"flows"` + Flows []*Flow `yaml:"flows"` + TransformDefinitions []*TransformDefinition `yaml:"transform_definitions"` } var idRegex = regexp.MustCompile(`^[a-zA-Z0-9\-]{1,255}$`) @@ -27,6 +28,28 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error { flowIDs := map[string]bool{} sourceSlugs := map[string]bool{} + luaTransformIDs := map[string]bool{} + + if c.TransformDefinitions != nil { + for i, transform := range c.TransformDefinitions { + if !slices.Contains(TransformTypes, transform.Type) { + return fmt.Errorf("invalid transform type: %s. allowed: %v", transform.Type, TransformTypes) + } + + if !idRegex.MatchString(transform.ID) { + return idValidationErr(fmt.Sprintf("transforms[%d].id", i)) + } + + if luaTransformIDs[transform.ID] { + return fmt.Errorf("transform ids must be unique. duplicate transform id: %s", transform.ID) + } + luaTransformIDs[transform.ID] = true + + if transform.Script == "" { + return fmt.Errorf("transform script cannot be empty") + } + } + } for i, f := range c.Flows { if !idRegex.MatchString(f.ID) { @@ -119,6 +142,13 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error { return fmt.Errorf("invalid url scheme: %s", sink.URL) } } + + // validate transform + if sink.Transform != nil { + if !luaTransformIDs[sink.Transform.ID] { + return fmt.Errorf("lua transform id not found: %s", sink.Transform.ID) + } + } } } diff --git a/pkg/models/inhooks_config_test.go b/pkg/models/inhooks_config_test.go index acfd72d..c37db55 100644 --- a/pkg/models/inhooks_config_test.go +++ b/pkg/models/inhooks_config_test.go @@ -32,6 +32,9 @@ func TestValidateInhooksConfig_OK(t *testing.T) { Type: "http", URL: "https://example.com/sink", Delay: &delay, + Transform: &Transform{ + ID: "lua-transform-1", + }, }, }, }, @@ -57,6 +60,13 @@ func TestValidateInhooksConfig_OK(t *testing.T) { }, }, }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "lua-transform-1", + Type: TransformTypeLua, + Script: "function transform(data) data.username = data.name end", + }, + }, } assert.NoError(t, ValidateInhooksConfig(appConf, c)) @@ -359,3 +369,116 @@ func TestValidateInhooksConfig_InvalidHMACAlgorithm(t *testing.T) { assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "invalid hmac algorithm: somealgorithm. allowed: [sha256]") } +func TestValidateInhooksConfig_InexistingTransformID(t *testing.T) { + ctx := context.Background() + appConf, err := testsupport.InitAppConfig(ctx) + assert.NoError(t, err) + + c := &InhooksConfig{ + Flows: []*Flow{ + { + ID: "flow-1", + Source: &Source{ + ID: "source-1", + Slug: "source-1-slug", + Type: "http", + }, + Sinks: []*Sink{ + { + ID: "sink-1", + Type: "http", + URL: "https://example.com/sink", + Transform: &Transform{ + ID: "non-existent-transform", + }, + }, + }, + }, + }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "lua-transform-1", + Type: TransformTypeLua, + Script: "function transform(data) end", + }, + }, + } + + assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "transform id not found: non-existent-transform") +} + +func TestValidateInhooksConfig_InvalidTransformType(t *testing.T) { + ctx := context.Background() + appConf, err := testsupport.InitAppConfig(ctx) + assert.NoError(t, err) + + c := &InhooksConfig{ + Flows: []*Flow{ + { + ID: "flow-1", + Source: &Source{ + ID: "source-1", + Slug: "source-1-slug", + Type: "http", + }, + Sinks: []*Sink{ + { + ID: "sink-1", + Type: "http", + URL: "https://example.com/sink", + Transform: &Transform{ + ID: "lua-transform-1", + }, + }, + }, + }, + }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "lua-transform-1", + Type: "invalid-type", + Script: "function transform(data) end", + }, + }, + } + + assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "invalid transform type: invalid-type. allowed: [lua]") +} + +func TestValidateInhooksConfig_EmptyTransformScript(t *testing.T) { + ctx := context.Background() + appConf, err := testsupport.InitAppConfig(ctx) + assert.NoError(t, err) + + c := &InhooksConfig{ + Flows: []*Flow{ + { + ID: "flow-1", + Source: &Source{ + ID: "source-1", + Slug: "source-1-slug", + Type: "http", + }, + Sinks: []*Sink{ + { + ID: "sink-1", + Type: "http", + URL: "https://example.com/sink", + Transform: &Transform{ + ID: "lua-transform-1", + }, + }, + }, + }, + }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "lua-transform-1", + Type: TransformTypeLua, + Script: "", + }, + }, + } + + assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "transform script cannot be empty") +} diff --git a/pkg/models/sink.go b/pkg/models/sink.go index 4cc5f42..f96e1a8 100644 --- a/pkg/models/sink.go +++ b/pkg/models/sink.go @@ -27,4 +27,6 @@ type Sink struct { RetryExpMultiplier *float64 `yaml:"retryExpMultiplier"` // Max attempts MaxAttempts *int `yaml:"maxAttempts"` + // Transform to apply to the data + Transform *Transform `yaml:"transform"` } diff --git a/pkg/models/transform.go b/pkg/models/transform.go new file mode 100644 index 0000000..1ac9100 --- /dev/null +++ b/pkg/models/transform.go @@ -0,0 +1,21 @@ +package models + +type TransformType string + +const ( + TransformTypeLua = "lua" +) + +var TransformTypes = []TransformType{ + TransformTypeLua, +} + +type TransformDefinition struct { + ID string `yaml:"id"` + Type TransformType `yaml:"type"` + Script string `yaml:"script"` +} + +type Transform struct { + ID string `yaml:"id"` +} diff --git a/pkg/services/inhooks_config_service.go b/pkg/services/inhooks_config_service.go index 3c2d0f1..34cae03 100644 --- a/pkg/services/inhooks_config_service.go +++ b/pkg/services/inhooks_config_service.go @@ -16,14 +16,16 @@ type InhooksConfigService interface { FindFlowForSource(sourceSlug string) *models.Flow GetFlow(flowID string) *models.Flow GetFlows() map[string]*models.Flow + GetTransformDefinition(transformID string) *models.TransformDefinition } type inhooksConfigService struct { - logger *zap.Logger - appConf *lib.AppConfig - inhooksConfig *models.InhooksConfig - flowsBySourceSlug map[string]*models.Flow - flowsByID map[string]*models.Flow + logger *zap.Logger + appConf *lib.AppConfig + inhooksConfig *models.InhooksConfig + flowsBySourceSlug map[string]*models.Flow + flowsByID map[string]*models.Flow + transformDefinitionsByID map[string]*models.TransformDefinition } func NewInhooksConfigService(logger *zap.Logger, appConf *lib.AppConfig) InhooksConfigService { @@ -60,6 +62,11 @@ func (s *inhooksConfigService) Load(filepath string) error { return errors.Wrapf(err, "failed to build flows map") } + err = s.initTransformDefinitionsMap() + if err != nil { + return errors.Wrapf(err, "failed to build transform definitions map") + } + s.log() return nil @@ -77,6 +84,10 @@ func (s *inhooksConfigService) GetFlows() map[string]*models.Flow { return s.flowsByID } +func (s *inhooksConfigService) GetTransformDefinition(transformID string) *models.TransformDefinition { + return s.transformDefinitionsByID[transformID] +} + func (s *inhooksConfigService) initFlowsMaps() error { s.flowsBySourceSlug = map[string]*models.Flow{} s.flowsByID = map[string]*models.Flow{} @@ -104,7 +115,24 @@ func (s *inhooksConfigService) initFlowsMaps() error { return nil } +func (s *inhooksConfigService) initTransformDefinitionsMap() error { + s.transformDefinitionsByID = map[string]*models.TransformDefinition{} + for _, transformDefinition := range s.inhooksConfig.TransformDefinitions { + s.transformDefinitionsByID[transformDefinition.ID] = transformDefinition + } + + return nil +} + func (s *inhooksConfigService) log() { + for _, transform := range s.inhooksConfig.TransformDefinitions { + s.logger.Info("loaded transform", + zap.String("id", transform.ID), + zap.String("type", string(transform.Type)), + zap.String("script", transform.Script), + ) + } + for _, f := range s.flowsByID { s.logger.Info("loaded flow", zap.String("id", f.ID), @@ -119,6 +147,7 @@ func (s *inhooksConfigService) log() { zap.String("type", string(sink.Type)), zap.String("url", string(sink.URL)), zap.Durationp("delay", sink.Delay), + zap.Any("transform", sink.Transform), ) } } diff --git a/pkg/services/message_processor.go b/pkg/services/message_processor.go index 163bba8..1452cef 100644 --- a/pkg/services/message_processor.go +++ b/pkg/services/message_processor.go @@ -12,7 +12,7 @@ import ( ) type MessageProcessor interface { - Process(ctx context.Context, sink *models.Sink, m *models.Message) error + Process(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error } type messageProcessor struct { @@ -25,17 +25,17 @@ func NewMessageProcessor(httpClient *http.Client) MessageProcessor { } } -func (p *messageProcessor) Process(ctx context.Context, sink *models.Sink, m *models.Message) error { +func (p *messageProcessor) Process(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error { switch sink.Type { case models.SinkTypeHttp: - return p.processHTTP(ctx, sink, m) + return p.processHTTP(ctx, sink, m, transformedPayload) default: return fmt.Errorf("unkown sink type %s", sink.Type) } } -func (p *messageProcessor) processHTTP(ctx context.Context, sink *models.Sink, m *models.Message) error { - buf := bytes.NewBuffer(m.Payload) +func (p *messageProcessor) processHTTP(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error { + buf := bytes.NewBuffer(transformedPayload) req, err := http.NewRequestWithContext(ctx, http.MethodPost, sink.URL, buf) if err != nil { diff --git a/pkg/services/message_processor_test.go b/pkg/services/message_processor_test.go index 08cb48c..5ea8332 100644 --- a/pkg/services/message_processor_test.go +++ b/pkg/services/message_processor_test.go @@ -20,6 +20,8 @@ func TestMessageProcessor(t *testing.T) { p := NewMessageProcessor(cl) payload := []byte(`{"id": "the-payload"}`) + transformedPayload := []byte(`{"id": "the-transformed-payload"}`) + headers := http.Header{ "X-Key": []string{"123"}, "User-Agent": []string{"Sender-User-Agent"}, @@ -31,14 +33,14 @@ func TestMessageProcessor(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { body, err := io.ReadAll(req.Body) assert.NoError(t, err) - assert.Equal(t, payload, body) + assert.Equal(t, transformedPayload, body) assert.Equal(t, rawQuery, req.URL.RawQuery) assert.Equal(t, http.Header{ "X-Key": []string{"123"}, "User-Agent": []string{"Inhooks/test (https://github.com/didil/inhooks)"}, - "Content-Length": []string{"21"}, + "Content-Length": []string{"33"}, "Accept-Encoding": []string{"*"}, }, req.Header) })) @@ -55,7 +57,7 @@ func TestMessageProcessor(t *testing.T) { Payload: payload, } - err := p.Process(ctx, sink, m) + err := p.Process(ctx, sink, m, transformedPayload) assert.NoError(t, err) } diff --git a/pkg/services/payload_transformer.go b/pkg/services/payload_transformer.go new file mode 100644 index 0000000..59bdba3 --- /dev/null +++ b/pkg/services/payload_transformer.go @@ -0,0 +1,22 @@ +package services + +import ( + "context" + + "github.com/didil/inhooks/pkg/models" +) + +type PayloadTransformer interface { + Transform(ctx context.Context, transformDefinition *models.TransformDefinition, payload []byte) ([]byte, error) +} + +type payloadTransformer struct { +} + +func NewPayloadTransformer() PayloadTransformer { + return &payloadTransformer{} +} + +func (p *payloadTransformer) Transform(ctx context.Context, transformDefinition *models.TransformDefinition, payload []byte) ([]byte, error) { + return payload, nil +} diff --git a/pkg/supervisor/ready.go b/pkg/supervisor/ready.go index ec02a95..fcadca2 100644 --- a/pkg/supervisor/ready.go +++ b/pkg/supervisor/ready.go @@ -79,10 +79,22 @@ func (s *Supervisor) startReadyProcessor(ctx context.Context, f *models.Flow, si zap.String("ingestedReqID", m.IngestedReqID), ) + transformDefinition := s.inhooksConfigSvc.GetTransformDefinition(sink.Transform.ID) + if transformDefinition == nil { + logger.Error("transform definition not found", zap.String("transformID", sink.Transform.ID)) + continue + } + + transformedPayload, err := s.payloadTransformer.Transform(ctx, transformDefinition, m.Payload) + if err != nil { + logger.Error("failed to transform payload", zap.Error(err)) + continue + } + logger.Info("processing message", zap.Int("attempt#", len(m.DeliveryAttempts)+1)) messageProcessingAttemptsCounter.Inc() - processingErr := s.messageProcessor.Process(ctx, sink, m) + processingErr := s.messageProcessor.Process(ctx, sink, m, transformedPayload) if processingErr != nil { logger.Info("message processing failed", zap.Error(processingErr)) messageProcessingFailureCounter.Inc() diff --git a/pkg/supervisor/ready_test.go b/pkg/supervisor/ready_test.go index 8f3fc5f..926922f 100644 --- a/pkg/supervisor/ready_test.go +++ b/pkg/supervisor/ready_test.go @@ -28,9 +28,13 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { flowId1 := "flow-1" sinkID1 := "sink-1" + transformID1 := "transform-1" sink1 := &models.Sink{ ID: sinkID1, + Transform: &models.Transform{ + ID: transformID1, + }, } flow1 := &models.Flow{ @@ -41,12 +45,23 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { mID1 := "message-1" m := &models.Message{ - ID: mID1, + ID: mID1, + Payload: []byte(`{"id": "the-payload"}`), + } + + transformedPayload := []byte(`{"id": "the-transformed-payload"}`) + + transformDefinition := &models.TransformDefinition{ + ID: "transform-definition-1", + Type: models.TransformTypeLua, + Script: "some script", } messageFetcher := mocks.NewMockMessageFetcher(ctrl) + payloadTransformer := mocks.NewMockPayloadTransformer(ctrl) messageProcessor := mocks.NewMockMessageProcessor(ctrl) processingResultsService := mocks.NewMockProcessingResultsService(ctrl) + inhooksConfigService := mocks.NewMockInhooksConfigService(ctrl) logger, err := zap.NewDevelopment() assert.NoError(t, err) @@ -56,6 +71,8 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { WithMessageProcessor(messageProcessor), WithProcessingResultsService(processingResultsService), WithAppConfig(appConf), + WithPayloadTransformer(payloadTransformer), + WithInhooksConfigService(inhooksConfigService), WithLogger(logger), ) @@ -70,12 +87,26 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { return m, nil } + // simulate blocking + time.Sleep(100 * time.Millisecond) return nil, nil }) + inhooksConfigService.EXPECT(). + GetTransformDefinition(transformID1). + DoAndReturn(func(sinkID string) *models.TransformDefinition { + return transformDefinition + }) + + payloadTransformer.EXPECT(). + Transform(gomock.Any(), transformDefinition, m.Payload). + DoAndReturn(func(ctx context.Context, transformDefinition *models.TransformDefinition, payload []byte) ([]byte, error) { + return transformedPayload, nil + }) + messageProcessor.EXPECT(). - Process(gomock.Any(), sink1, m). - DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message) error { + Process(gomock.Any(), sink1, m, transformedPayload). + DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error { return nil }) @@ -101,9 +132,13 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { flowId1 := "flow-1" sinkID1 := "sink-1" + transformID1 := "transform-1" sink1 := &models.Sink{ ID: sinkID1, + Transform: &models.Transform{ + ID: "transform-1", + }, } flow1 := &models.Flow{ @@ -117,9 +152,19 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { ID: mID1, } + transformedPayload := []byte(`{"id": "the-transformed-payload"}`) + + transformDefinition := &models.TransformDefinition{ + ID: "transform-definition-1", + Type: models.TransformTypeLua, + Script: "some script", + } + messageFetcher := mocks.NewMockMessageFetcher(ctrl) messageProcessor := mocks.NewMockMessageProcessor(ctrl) processingResultsService := mocks.NewMockProcessingResultsService(ctrl) + payloadTransformer := mocks.NewMockPayloadTransformer(ctrl) + inhooksConfigService := mocks.NewMockInhooksConfigService(ctrl) logger, err := zap.NewDevelopment() assert.NoError(t, err) @@ -127,6 +172,8 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { s := NewSupervisor( WithMessageFetcher(messageFetcher), WithMessageProcessor(messageProcessor), + WithPayloadTransformer(payloadTransformer), + WithInhooksConfigService(inhooksConfigService), WithProcessingResultsService(processingResultsService), WithAppConfig(appConf), WithLogger(logger), @@ -147,9 +194,17 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { return nil, nil }) + inhooksConfigService.EXPECT(). + GetTransformDefinition(transformID1). + Return(transformDefinition) + + payloadTransformer.EXPECT(). + Transform(gomock.Any(), transformDefinition, m.Payload). + Return(transformedPayload, nil) + messageProcessor.EXPECT(). - Process(gomock.Any(), sink1, m). - DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message) error { + Process(gomock.Any(), sink1, m, transformedPayload). + DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error { return processingErr }) diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index 115aa0a..0cfb752 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -21,6 +21,7 @@ type Supervisor struct { schedulerSvc services.SchedulerService processingRecoverySvc services.ProcessingRecoveryService cleanupSvc services.CleanupService + payloadTransformer services.PayloadTransformer } type SupervisorOpt func(s *Supervisor) @@ -93,6 +94,12 @@ func WithCleanupService(cleanupSvc services.CleanupService) SupervisorOpt { } } +func WithPayloadTransformer(payloadTransformer services.PayloadTransformer) SupervisorOpt { + return func(s *Supervisor) { + s.payloadTransformer = payloadTransformer + } +} + func (s *Supervisor) Start() { wg := &sync.WaitGroup{} flows := s.inhooksConfigSvc.GetFlows() diff --git a/pkg/testsupport/mocks/gen_mocks.sh b/pkg/testsupport/mocks/gen_mocks.sh index 6153d8f..14b0ee6 100755 --- a/pkg/testsupport/mocks/gen_mocks.sh +++ b/pkg/testsupport/mocks/gen_mocks.sh @@ -14,6 +14,7 @@ services=( "processing_recovery_service" "cleanup_service" "message_verifier" + "payload_transformer" ) for service in ${services[@]} diff --git a/pkg/testsupport/mocks/mock_inhooks_config_service.go b/pkg/testsupport/mocks/mock_inhooks_config_service.go index 4c03c6b..79b3ad4 100644 --- a/pkg/testsupport/mocks/mock_inhooks_config_service.go +++ b/pkg/testsupport/mocks/mock_inhooks_config_service.go @@ -76,6 +76,20 @@ func (mr *MockInhooksConfigServiceMockRecorder) GetFlows() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFlows", reflect.TypeOf((*MockInhooksConfigService)(nil).GetFlows)) } +// GetTransformDefinition mocks base method. +func (m *MockInhooksConfigService) GetTransformDefinition(transformID string) *models.TransformDefinition { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransformDefinition", transformID) + ret0, _ := ret[0].(*models.TransformDefinition) + return ret0 +} + +// GetTransformDefinition indicates an expected call of GetTransformDefinition. +func (mr *MockInhooksConfigServiceMockRecorder) GetTransformDefinition(transformID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransformDefinition", reflect.TypeOf((*MockInhooksConfigService)(nil).GetTransformDefinition), transformID) +} + // Load mocks base method. func (m *MockInhooksConfigService) Load(path string) error { m.ctrl.T.Helper() diff --git a/pkg/testsupport/mocks/mock_message_processor.go b/pkg/testsupport/mocks/mock_message_processor.go index 0853b98..3f0620b 100644 --- a/pkg/testsupport/mocks/mock_message_processor.go +++ b/pkg/testsupport/mocks/mock_message_processor.go @@ -36,15 +36,15 @@ func (m *MockMessageProcessor) EXPECT() *MockMessageProcessorMockRecorder { } // Process mocks base method. -func (m_2 *MockMessageProcessor) Process(ctx context.Context, sink *models.Sink, m *models.Message) error { +func (m_2 *MockMessageProcessor) Process(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error { m_2.ctrl.T.Helper() - ret := m_2.ctrl.Call(m_2, "Process", ctx, sink, m) + ret := m_2.ctrl.Call(m_2, "Process", ctx, sink, m, transformedPayload) ret0, _ := ret[0].(error) return ret0 } // Process indicates an expected call of Process. -func (mr *MockMessageProcessorMockRecorder) Process(ctx, sink, m interface{}) *gomock.Call { +func (mr *MockMessageProcessorMockRecorder) Process(ctx, sink, m, transformedPayload interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockMessageProcessor)(nil).Process), ctx, sink, m) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockMessageProcessor)(nil).Process), ctx, sink, m, transformedPayload) } diff --git a/pkg/testsupport/mocks/mock_payload_transformer.go b/pkg/testsupport/mocks/mock_payload_transformer.go new file mode 100644 index 0000000..6872f11 --- /dev/null +++ b/pkg/testsupport/mocks/mock_payload_transformer.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/services/payload_transformer.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + models "github.com/didil/inhooks/pkg/models" + gomock "github.com/golang/mock/gomock" +) + +// MockPayloadTransformer is a mock of PayloadTransformer interface. +type MockPayloadTransformer struct { + ctrl *gomock.Controller + recorder *MockPayloadTransformerMockRecorder +} + +// MockPayloadTransformerMockRecorder is the mock recorder for MockPayloadTransformer. +type MockPayloadTransformerMockRecorder struct { + mock *MockPayloadTransformer +} + +// NewMockPayloadTransformer creates a new mock instance. +func NewMockPayloadTransformer(ctrl *gomock.Controller) *MockPayloadTransformer { + mock := &MockPayloadTransformer{ctrl: ctrl} + mock.recorder = &MockPayloadTransformerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPayloadTransformer) EXPECT() *MockPayloadTransformerMockRecorder { + return m.recorder +} + +// Transform mocks base method. +func (m *MockPayloadTransformer) Transform(ctx context.Context, transformDefinition *models.TransformDefinition, payload []byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Transform", ctx, transformDefinition, payload) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Transform indicates an expected call of Transform. +func (mr *MockPayloadTransformerMockRecorder) Transform(ctx, transformDefinition, payload interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Transform", reflect.TypeOf((*MockPayloadTransformer)(nil).Transform), ctx, transformDefinition, payload) +}