diff --git a/.env.example b/.env.example index 94233ea..785eb9c 100644 --- a/.env.example +++ b/.env.example @@ -3,3 +3,4 @@ HOST=localhost PORT=3000 REDIS_URL=redis://localhost:6379 REDIS_INHOOKS_DB_NAME=development +TRANSFORM_JAVASCRIPT_TIMEOUT=1s \ No newline at end of file diff --git a/.env.test.example b/.env.test.example index a19a196..2dba140 100644 --- a/.env.test.example +++ b/.env.test.example @@ -2,4 +2,4 @@ APP_ENV=test HOST=localhost PORT=3001 REDIS_URL=redis://localhost:6379 -REDIS_INHOOKS_DB_NAME=test \ No newline at end of file +REDIS_INHOOKS_DB_NAME=test diff --git a/README.md b/README.md index f423849..004b9e3 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Inhooks listens to HTTP webhooks and saves the messages to Redis. A processing m - Supports delayed processing - Supports retries on failure with configurable number of attempts, interval and constant or exponential backoff - Supports different HTTP payloads types: JSON, x-www-form-urlencoded, multipart/form-data +- Supports message transformation using JavaScript ECMAScript 5.1 - ... more features planned ## Usage @@ -79,6 +80,82 @@ flows: currentSecretEnvVar: VERIFICATION_FLOW_1_CURRENT_SECRET # the name of the environment variable containing the verification secret previousSecretEnvVar: VERIFICATION_FLOW_1_PREVIOUS_SECRET # optional env var that allows rotating secrets without service interruption ``` + +### Message transformation + +#### Transform definition + +Message transformation allows you to modify the payload and headers of messages before they are sent to the sinks (destinations). This can be useful for tasks such as adding or removing fields, changing the format of the data, or adding custom headers. + +Currently, only JavaScript transformations are supported. The JavaScript function should be named `transform` and should take two parameters: `bodyStr` (the message body as a string) and `headers` (the message headers as a JSON object). The function should return an array with two elements: the transformed payload as a string and the transformed headers as a JSON object. +The `headers` fields has the following format: +``` +{ + "header-name": ["value1", "value2"] +} +``` + +Only JavaScript ECMAScript 5.1 is supported at the moment. We use the [goja](https://github.com/dop251/goja) library to execute the JavaScript code. You can read about the limitations on goja's documentation pages. + +Here is an example configuration: +```yaml +flows: + - id: flow-1 + source: + id: source-1 + slug: source-1-slug + type: http + sinks: + - id: sink-1 + type: http + url: https://example.com/target + transform: + id: js-transform-1 +transform_definitions: + - id: js-transform-1 + type: javascript + script: | + function transform(bodyStr, headers) { + const body = JSON.parse(bodyStr); + + // add a header + headers["X-INHOOKS-TRANSFORMED"] = ["1"]; + // capitalize the message if present + if (body.msg) { + body.msg = body.msg.toUpperCase(); + } + // delete a key from the body + delete body.my_dummy_key; + + return [JSON.stringify(body), headers]; + } +``` + + +#### Testing transform scripts + +You can use the `/api/v1/transform` endpoint to test your transform scripts before adding them to your flow configuration. This endpoint allows you to simulate the transformation process and see the results immediately. + +To use this endpoint, send a POST request with a JSON payload containing the following fields: +- `body`: The message body as a string +- `headers`: The message headers as a JSON object +- `transformDefinition`: An object containing the `type` and `script` of your transformation + +Here's an example of how to use the `/api/v1/transform` endpoint: +```shell +curl -X POST http://localhost:3000/api/v1/transform \ + -H "Content-Type: application/json" \ + -d '{ + "body": "{\"msg\": \"hello world\", \"my_dummy_key\": \"value\"}", + "headers": {"Content-Type": ["application/json"]}, + "transformDefinition": { + "type": "javascript", + "script": "function transform(bodyStr, headers) { const body = JSON.parse(bodyStr); headers[\"X-INHOOKS-TRANSFORMED\"] = [\"1\"]; if (body.msg) { body.msg = body.msg.toUpperCase(); } delete body.my_dummy_key; return [JSON.stringify(body), headers]; }" + } + }' +``` + + ### Prometheus metrics Inhooks exposes Prometheus metrics at the `/api/v1/metrics` endpoint. diff --git a/cmd/api/main.go b/cmd/api/main.go index 83b24e8..b15138b 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "flag" "fmt" "log" "net/http" @@ -26,6 +27,15 @@ var ( func main() { versionpkg.SetVersion(version) + // handle version command + isVersionCmd := flag.Bool("version", false, "print the version") + flag.Parse() + if *isVersionCmd { + fmt.Println(version) + os.Exit(0) + } + + // start server err := lib.LoadEnv() if err != nil { log.Fatalf("failed to load env: %v", err) @@ -41,6 +51,8 @@ func main() { log.Fatalf("failed to initialize logger: %v", err) } + logger.Info("starting Inhooks", zap.String("version", version)) + inhooksConfigSvc := services.NewInhooksConfigService(logger, appConf) logger.Info("loading inhooks config", zap.String("inhooksConfigFile", appConf.InhooksConfigFile)) @@ -65,6 +77,7 @@ func main() { messageEnqueuer := services.NewMessageEnqueuer(redisStore, timeSvc) messageFetcher := services.NewMessageFetcher(redisStore, timeSvc) messageVerifier := services.NewMessageVerifier() + messageTransformer := services.NewMessageTransformer(&appConf.Transform) app := handlers.NewApp( handlers.WithLogger(logger), @@ -72,6 +85,7 @@ func main() { handlers.WithMessageBuilder(messageBuilder), handlers.WithMessageEnqueuer(messageEnqueuer), handlers.WithMessageVerifier(messageVerifier), + handlers.WithMessageTransformer(messageTransformer), ) r := server.NewRouter(app) @@ -119,6 +133,7 @@ func main() { supervisor.WithSchedulerService(schedulerSvc), supervisor.WithProcessingRecoveryService(processingRecoverySvc), supervisor.WithCleanupService(cleanupSvc), + supervisor.WithMessageTransformer(messageTransformer), ) wg.Add(1) diff --git a/go.mod b/go.mod index 0e150a4..ecdcbec 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,8 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingaikin/go-header v0.4.3 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/dlclark/regexp2 v1.7.0 // indirect + github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/esimonov/ifshort v1.0.4 // indirect github.com/ettle/strcase v0.1.1 // indirect @@ -58,6 +60,7 @@ require ( github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/fzipp/gocyclo v0.6.0 // indirect github.com/go-critic/go-critic v0.7.0 // indirect + github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-toolsmith/astcast v1.1.0 // indirect github.com/go-toolsmith/astcopy v1.1.0 // indirect github.com/go-toolsmith/astequal v1.1.0 // indirect @@ -79,6 +82,7 @@ require ( github.com/golangci/revgrep v0.0.0-20220804021717-745bb2f7c2e6 // indirect github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect github.com/gordonklaus/ineffassign v0.0.0-20230107090616-13ace0543b28 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/comment v1.4.2 // indirect diff --git a/go.sum b/go.sum index 194dacd..efd1b6f 100644 --- a/go.sum +++ b/go.sum @@ -122,6 +122,10 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo= +github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2 h1:4Ew88p5s9dwIk5/woUyqI9BD89NgZoUNH4/rM/h2UDg= +github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2/go.mod h1:o31y53rb/qiIAONF7w3FHJZRqqP3fzHUr1HqanthByw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -159,6 +163,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-toolsmith/astcast v1.1.0 h1:+JN9xZV1A+Re+95pgnMgDboWNVnIMMQXwfBwLRPgSC8= github.com/go-toolsmith/astcast v1.1.0/go.mod h1:qdcuFWeGGS2xX5bLM/c3U9lewg7+Zu4mr+xPwZIB4ZU= @@ -264,6 +270,8 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= diff --git a/pkg/lib/config.go b/pkg/lib/config.go index 92208ba..898d4f1 100644 --- a/pkg/lib/config.go +++ b/pkg/lib/config.go @@ -15,6 +15,7 @@ type AppConfig struct { Supervisor SupervisorConfig HTTPClient HTTPClientConfig Sink SinkConfig + Transform TransformConfig } type ServerConfig struct { @@ -57,6 +58,10 @@ type SinkConfig struct { DefaultRetryExpMultiplier float64 `env:"SINK_DEFAULT_RETRY_EXP_MULTIPLIER,default=1"` } +type TransformConfig struct { + JavascriptTimeout time.Duration `env:"TRANSFORM_JAVASCRIPT_TIMEOUT,default=1s"` +} + func InitAppConfig(ctx context.Context) (*AppConfig, error) { appConf := &AppConfig{} err := envconfig.Process(ctx, appConf) diff --git a/pkg/models/inhooks_config.go b/pkg/models/inhooks_config.go index 49424c6..f7c0c40 100644 --- a/pkg/models/inhooks_config.go +++ b/pkg/models/inhooks_config.go @@ -10,7 +10,8 @@ import ( ) type InhooksConfig struct { - Flows []*Flow `yaml:"flows"` + Flows []*Flow `yaml:"flows"` + TransformDefinitions []*TransformDefinition `yaml:"transform_definitions"` } var idRegex = regexp.MustCompile(`^[a-zA-Z0-9\-]{1,255}$`) @@ -27,6 +28,28 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error { flowIDs := map[string]bool{} sourceSlugs := map[string]bool{} + transformIDs := map[string]bool{} + + if c.TransformDefinitions != nil { + for i, transform := range c.TransformDefinitions { + if !slices.Contains(TransformTypes, transform.Type) { + return fmt.Errorf("invalid transform type: %s. allowed: %v", transform.Type, TransformTypes) + } + + if !idRegex.MatchString(transform.ID) { + return idValidationErr(fmt.Sprintf("transforms[%d].id", i)) + } + + if transformIDs[transform.ID] { + return fmt.Errorf("transform ids must be unique. duplicate transform id: %s", transform.ID) + } + transformIDs[transform.ID] = true + + if transform.Script == "" { + return fmt.Errorf("transform script cannot be empty") + } + } + } for i, f := range c.Flows { if !idRegex.MatchString(f.ID) { @@ -119,6 +142,13 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error { return fmt.Errorf("invalid url scheme: %s", sink.URL) } } + + // validate transform + if sink.Transform != nil { + if !transformIDs[sink.Transform.ID] { + return fmt.Errorf("transform id not found: %s", sink.Transform.ID) + } + } } } diff --git a/pkg/models/inhooks_config_test.go b/pkg/models/inhooks_config_test.go index acfd72d..322fcdb 100644 --- a/pkg/models/inhooks_config_test.go +++ b/pkg/models/inhooks_config_test.go @@ -32,6 +32,9 @@ func TestValidateInhooksConfig_OK(t *testing.T) { Type: "http", URL: "https://example.com/sink", Delay: &delay, + Transform: &Transform{ + ID: "js-transform-1", + }, }, }, }, @@ -57,6 +60,13 @@ func TestValidateInhooksConfig_OK(t *testing.T) { }, }, }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "js-transform-1", + Type: TransformTypeJavascript, + Script: "function transform(data) data.username = data.name end", + }, + }, } assert.NoError(t, ValidateInhooksConfig(appConf, c)) @@ -359,3 +369,116 @@ func TestValidateInhooksConfig_InvalidHMACAlgorithm(t *testing.T) { assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "invalid hmac algorithm: somealgorithm. allowed: [sha256]") } +func TestValidateInhooksConfig_InexistingTransformID(t *testing.T) { + ctx := context.Background() + appConf, err := testsupport.InitAppConfig(ctx) + assert.NoError(t, err) + + c := &InhooksConfig{ + Flows: []*Flow{ + { + ID: "flow-1", + Source: &Source{ + ID: "source-1", + Slug: "source-1-slug", + Type: "http", + }, + Sinks: []*Sink{ + { + ID: "sink-1", + Type: "http", + URL: "https://example.com/sink", + Transform: &Transform{ + ID: "non-existent-transform", + }, + }, + }, + }, + }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "js-transform-1", + Type: TransformTypeJavascript, + Script: "function transform(data) end", + }, + }, + } + + assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "transform id not found: non-existent-transform") +} + +func TestValidateInhooksConfig_InvalidTransformType(t *testing.T) { + ctx := context.Background() + appConf, err := testsupport.InitAppConfig(ctx) + assert.NoError(t, err) + + c := &InhooksConfig{ + Flows: []*Flow{ + { + ID: "flow-1", + Source: &Source{ + ID: "source-1", + Slug: "source-1-slug", + Type: "http", + }, + Sinks: []*Sink{ + { + ID: "sink-1", + Type: "http", + URL: "https://example.com/sink", + Transform: &Transform{ + ID: "some-transform-1", + }, + }, + }, + }, + }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "some-transform-1", + Type: "invalid-type", + Script: "function transform(data) end", + }, + }, + } + + assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "invalid transform type: invalid-type. allowed: [javascript]") +} + +func TestValidateInhooksConfig_EmptyTransformScript(t *testing.T) { + ctx := context.Background() + appConf, err := testsupport.InitAppConfig(ctx) + assert.NoError(t, err) + + c := &InhooksConfig{ + Flows: []*Flow{ + { + ID: "flow-1", + Source: &Source{ + ID: "source-1", + Slug: "source-1-slug", + Type: "http", + }, + Sinks: []*Sink{ + { + ID: "sink-1", + Type: "http", + URL: "https://example.com/sink", + Transform: &Transform{ + ID: "js-transform-1", + }, + }, + }, + }, + }, + TransformDefinitions: []*TransformDefinition{ + { + ID: "js-transform-1", + Type: TransformTypeJavascript, + Script: "", + }, + }, + } + + assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "transform script cannot be empty") +} diff --git a/pkg/models/sink.go b/pkg/models/sink.go index 4cc5f42..f96e1a8 100644 --- a/pkg/models/sink.go +++ b/pkg/models/sink.go @@ -27,4 +27,6 @@ type Sink struct { RetryExpMultiplier *float64 `yaml:"retryExpMultiplier"` // Max attempts MaxAttempts *int `yaml:"maxAttempts"` + // Transform to apply to the data + Transform *Transform `yaml:"transform"` } diff --git a/pkg/models/transform.go b/pkg/models/transform.go new file mode 100644 index 0000000..1a4d4a1 --- /dev/null +++ b/pkg/models/transform.go @@ -0,0 +1,21 @@ +package models + +type TransformType string + +const ( + TransformTypeJavascript = "javascript" +) + +var TransformTypes = []TransformType{ + TransformTypeJavascript, +} + +type TransformDefinition struct { + ID string `yaml:"id"` + Type TransformType `yaml:"type"` + Script string `yaml:"script"` +} + +type Transform struct { + ID string `yaml:"id"` +} diff --git a/pkg/server/handlers/app.go b/pkg/server/handlers/app.go index 3b32500..ee6fda6 100644 --- a/pkg/server/handlers/app.go +++ b/pkg/server/handlers/app.go @@ -9,11 +9,12 @@ import ( ) type App struct { - logger *zap.Logger - inhooksConfigSvc services.InhooksConfigService - messageBuilder services.MessageBuilder - messageEnqueuer services.MessageEnqueuer - messageVerifier services.MessageVerifier + logger *zap.Logger + inhooksConfigSvc services.InhooksConfigService + messageBuilder services.MessageBuilder + messageEnqueuer services.MessageEnqueuer + messageVerifier services.MessageVerifier + messageTransformer services.MessageTransformer } type AppOpt func(app *App) @@ -58,6 +59,12 @@ func WithMessageVerifier(messageVerifier services.MessageVerifier) AppOpt { } } +func WithMessageTransformer(messageTransformer services.MessageTransformer) AppOpt { + return func(app *App) { + app.messageTransformer = messageTransformer + } +} + type JSONErr struct { Error string `json:"error"` ReqID string `json:"reqID,omitempty"` diff --git a/pkg/server/handlers/metrics.go b/pkg/server/handlers/metrics.go index b855b25..1b94d71 100644 --- a/pkg/server/handlers/metrics.go +++ b/pkg/server/handlers/metrics.go @@ -1,10 +1,19 @@ package handlers import ( - "github.com/prometheus/client_golang/prometheus/promhttp" "net/http" + + "github.com/go-chi/chi/v5/middleware" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" ) func (app *App) HandleMetrics(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + reqID := middleware.GetReqID(ctx) + logger := app.logger.With(zap.String("reqID", reqID)) + + logger.Info("new metrics request") promhttp.Handler().ServeHTTP(w, r) + logger.Info("metrics request succeeded") } diff --git a/pkg/server/handlers/transform.go b/pkg/server/handlers/transform.go new file mode 100644 index 0000000..7df7516 --- /dev/null +++ b/pkg/server/handlers/transform.go @@ -0,0 +1,66 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/didil/inhooks/pkg/models" + "github.com/go-chi/chi/v5/middleware" + "go.uber.org/zap" +) + +type TransformRequest struct { + Body string `json:"body"` + Headers map[string][]string `json:"headers"` + TransformDefinition *models.TransformDefinition `json:"transformDefinition"` +} + +type TransformResponse struct { + Body string `json:"body"` + Headers map[string][]string `json:"headers"` +} + +func (app *App) HandleTransform(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + reqID := middleware.GetReqID(ctx) + logger := app.logger.With(zap.String("reqID", reqID)) + + logger.Info("new transform request") + + transformRequest := &TransformRequest{} + err := json.NewDecoder(r.Body).Decode(transformRequest) + if err != nil { + logger.Error("transform request failed: unable to decode request body", zap.Error(err)) + app.WriteJSONErr(w, http.StatusBadRequest, reqID, fmt.Errorf("unable to decode request body")) + return + } + + if transformRequest.TransformDefinition == nil { + logger.Error("transform request failed: transform definition is required") + app.WriteJSONErr(w, http.StatusBadRequest, reqID, fmt.Errorf("transform definition is required")) + return + } + + m := &models.Message{ + Payload: []byte(transformRequest.Body), + HttpHeaders: transformRequest.Headers, + } + + err = app.messageTransformer.Transform(ctx, transformRequest.TransformDefinition, m) + if err != nil { + logger.Error("transform request failed: unable to transform message", zap.Error(err)) + app.WriteJSONErr(w, http.StatusInternalServerError, reqID, fmt.Errorf("unable to transform message")) + return + } + + transformResponse := &TransformResponse{ + Body: string(m.Payload), + Headers: m.HttpHeaders, + } + + app.WriteJSONResponse(w, http.StatusOK, transformResponse) + + logger.Info("transform request succeeded") + +} diff --git a/pkg/server/handlers/transform_test.go b/pkg/server/handlers/transform_test.go new file mode 100644 index 0000000..e2dbe73 --- /dev/null +++ b/pkg/server/handlers/transform_test.go @@ -0,0 +1,134 @@ +package handlers_test + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/didil/inhooks/pkg/models" + "github.com/didil/inhooks/pkg/server" + "github.com/didil/inhooks/pkg/server/handlers" + "github.com/didil/inhooks/pkg/testsupport/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestHandleTransform(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + + messageTransformer := mocks.NewMockMessageTransformer(ctrl) + + app := handlers.NewApp( + handlers.WithLogger(logger), + handlers.WithMessageTransformer(messageTransformer), + ) + r := server.NewRouter(app) + s := httptest.NewServer(r) + defer s.Close() + + transformDefinition := &models.TransformDefinition{ + Type: models.TransformTypeJavascript, + Script: "function transform(bodyStr, headers) { return [bodyStr.toUpperCase(), headers]; }", + } + + requestBody := handlers.TransformRequest{ + Body: "hello world", + Headers: map[string][]string{"Content-Type": {"text/plain"}}, + TransformDefinition: transformDefinition, + } + + messageTransformer.EXPECT().Transform( + gomock.Any(), + transformDefinition, + gomock.Any(), + ).DoAndReturn(func(ctx interface{}, td *models.TransformDefinition, m *models.Message) error { + m.Payload = []byte("HELLO WORLD") + return nil + }) + + buf, err := json.Marshal(requestBody) + assert.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, s.URL+"/api/v1/transform", bytes.NewBuffer(buf)) + assert.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var transformResponse handlers.TransformResponse + err = json.NewDecoder(resp.Body).Decode(&transformResponse) + assert.NoError(t, err) + + assert.Equal(t, "HELLO WORLD", transformResponse.Body) + assert.Equal(t, map[string][]string{"Content-Type": {"text/plain"}}, transformResponse.Headers) +} + +func TestHandleTransform_BadRequest(t *testing.T) { + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + + app := handlers.NewApp( + handlers.WithLogger(logger), + ) + r := server.NewRouter(app) + s := httptest.NewServer(r) + defer s.Close() + + req, err := http.NewRequest(http.MethodPost, s.URL+"/api/v1/transform", bytes.NewBufferString("invalid json")) + assert.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + + var jsonErr handlers.JSONErr + err = json.NewDecoder(resp.Body).Decode(&jsonErr) + assert.NoError(t, err) + assert.Contains(t, jsonErr.Error, "unable to decode request body") +} + +func TestHandleTransform_MissingTransformDefinition(t *testing.T) { + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + + app := handlers.NewApp( + handlers.WithLogger(logger), + ) + r := server.NewRouter(app) + s := httptest.NewServer(r) + defer s.Close() + + requestBody := handlers.TransformRequest{ + Body: "hello world", + Headers: map[string][]string{"Content-Type": {"text/plain"}}, + } + + buf, err := json.Marshal(requestBody) + assert.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, s.URL+"/api/v1/transform", bytes.NewBuffer(buf)) + assert.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + + var jsonErr handlers.JSONErr + err = json.NewDecoder(resp.Body).Decode(&jsonErr) + assert.NoError(t, err) + assert.Contains(t, jsonErr.Error, "transform definition is required") +} diff --git a/pkg/server/router.go b/pkg/server/router.go index 72e5ab3..8f0f6e8 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -12,6 +12,8 @@ func NewRouter(app *handlers.App) *chi.Mux { r.Use(middleware.Recoverer) r.Route("/api/v1", func(r chi.Router) { r.Post("/ingest/{sourceSlug}", app.HandleIngest) + + r.Post("/transform", app.HandleTransform) r.Get("/metrics", app.HandleMetrics) }) diff --git a/pkg/services/inhooks_config_service.go b/pkg/services/inhooks_config_service.go index 3c2d0f1..72cd4e7 100644 --- a/pkg/services/inhooks_config_service.go +++ b/pkg/services/inhooks_config_service.go @@ -16,14 +16,16 @@ type InhooksConfigService interface { FindFlowForSource(sourceSlug string) *models.Flow GetFlow(flowID string) *models.Flow GetFlows() map[string]*models.Flow + GetTransformDefinition(transformID string) *models.TransformDefinition } type inhooksConfigService struct { - logger *zap.Logger - appConf *lib.AppConfig - inhooksConfig *models.InhooksConfig - flowsBySourceSlug map[string]*models.Flow - flowsByID map[string]*models.Flow + logger *zap.Logger + appConf *lib.AppConfig + inhooksConfig *models.InhooksConfig + flowsBySourceSlug map[string]*models.Flow + flowsByID map[string]*models.Flow + transformDefinitionsByID map[string]*models.TransformDefinition } func NewInhooksConfigService(logger *zap.Logger, appConf *lib.AppConfig) InhooksConfigService { @@ -60,6 +62,11 @@ func (s *inhooksConfigService) Load(filepath string) error { return errors.Wrapf(err, "failed to build flows map") } + err = s.initTransformDefinitionsMap() + if err != nil { + return errors.Wrapf(err, "failed to build transform definitions map") + } + s.log() return nil @@ -77,6 +84,10 @@ func (s *inhooksConfigService) GetFlows() map[string]*models.Flow { return s.flowsByID } +func (s *inhooksConfigService) GetTransformDefinition(transformID string) *models.TransformDefinition { + return s.transformDefinitionsByID[transformID] +} + func (s *inhooksConfigService) initFlowsMaps() error { s.flowsBySourceSlug = map[string]*models.Flow{} s.flowsByID = map[string]*models.Flow{} @@ -104,7 +115,23 @@ func (s *inhooksConfigService) initFlowsMaps() error { return nil } +func (s *inhooksConfigService) initTransformDefinitionsMap() error { + s.transformDefinitionsByID = map[string]*models.TransformDefinition{} + for _, transformDefinition := range s.inhooksConfig.TransformDefinitions { + s.transformDefinitionsByID[transformDefinition.ID] = transformDefinition + } + + return nil +} + func (s *inhooksConfigService) log() { + for _, transform := range s.inhooksConfig.TransformDefinitions { + s.logger.Info("loaded transform", + zap.String("id", transform.ID), + zap.String("type", string(transform.Type)), + ) + } + for _, f := range s.flowsByID { s.logger.Info("loaded flow", zap.String("id", f.ID), @@ -119,6 +146,7 @@ func (s *inhooksConfigService) log() { zap.String("type", string(sink.Type)), zap.String("url", string(sink.URL)), zap.Durationp("delay", sink.Delay), + zap.Any("transform", sink.Transform), ) } } diff --git a/pkg/services/message_processor_test.go b/pkg/services/message_processor_test.go index 08cb48c..05f0940 100644 --- a/pkg/services/message_processor_test.go +++ b/pkg/services/message_processor_test.go @@ -20,6 +20,7 @@ func TestMessageProcessor(t *testing.T) { p := NewMessageProcessor(cl) payload := []byte(`{"id": "the-payload"}`) + headers := http.Header{ "X-Key": []string{"123"}, "User-Agent": []string{"Sender-User-Agent"}, diff --git a/pkg/services/message_transform_test.go b/pkg/services/message_transform_test.go new file mode 100644 index 0000000..a3bceec --- /dev/null +++ b/pkg/services/message_transform_test.go @@ -0,0 +1,89 @@ +package services + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/didil/inhooks/pkg/lib" + "github.com/didil/inhooks/pkg/models" + "github.com/stretchr/testify/assert" +) + +func TestMessageTransformer_Transform_Javascript(t *testing.T) { + config := &lib.TransformConfig{ + JavascriptTimeout: 5000 * time.Millisecond, + } + + mt := NewMessageTransformer(config) + + m := &models.Message{ + Payload: []byte(`{ + "name": "John Doe", + "age": 30, + "locations": ["New York", "London", "Tokyo"], + "scores": [85, 90, 78, 92] + }`), + HttpHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + "X-Request-Id": []string{"123"}, + "Authorization": []string{"Bearer token123"}, + }, + } + transformDefinition := &models.TransformDefinition{ + Type: models.TransformTypeJavascript, + Script: ` + function transform(bodyStr, headers) { + const body = JSON.parse(bodyStr); + body.username = body.name; + delete body.name; + delete body.age; + body.location_count = body.locations.length; + body.average_score = body.scores.reduce((a, b) => a + b, 0) / body.scores.length; + headers["X-AUTH-TOKEN"] = [headers.Authorization[0].split(' ')[1]]; + delete headers.Authorization; + return [JSON.stringify(body), headers]; + } + `, + } + err := mt.Transform(context.Background(), transformDefinition, m) + assert.NoError(t, err) + + assert.JSONEq(t, `{"username":"John Doe","locations":["New York","London","Tokyo"],"scores":[85,90,78,92],"average_score":86.25,"location_count":3}`, string(m.Payload)) + assert.Equal(t, http.Header{"Content-Type": []string{"application/json"}, "X-AUTH-TOKEN": []string{"token123"}, "X-Request-Id": []string{"123"}}, m.HttpHeaders) +} + +func TestMessageTransformer_Transform_Javascript_Error(t *testing.T) { + config := &lib.TransformConfig{ + JavascriptTimeout: 5000 * time.Millisecond, + } + + mt := NewMessageTransformer(config) + + m := &models.Message{ + Payload: []byte(`{ + "name": "John Doe", + "age": 30, + "locations": ["New York", "London", "Tokyo"], + "scores": [85, 90, 78, 92] + }`), + HttpHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + "X-Request-Id": []string{"123"}, + "Authorization": []string{"Bearer token123"}, + }, + } + transformDefinition := &models.TransformDefinition{ + Type: models.TransformTypeJavascript, + Script: ` + function transform(bodyStr, headers) { + const body = JSON.parse(bodyStr); + throw new Error("random error while in the transform function"); + return [JSON.stringify(body), headers]; + } + `, + } + err := mt.Transform(context.Background(), transformDefinition, m) + assert.ErrorContains(t, err, "failed to transform message: failed to execute JavaScript: Error: random error while in the transform function") +} diff --git a/pkg/services/message_transformer.go b/pkg/services/message_transformer.go new file mode 100644 index 0000000..652925e --- /dev/null +++ b/pkg/services/message_transformer.go @@ -0,0 +1,118 @@ +package services + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/didil/inhooks/pkg/lib" + "github.com/didil/inhooks/pkg/models" + "github.com/dop251/goja" +) + +type MessageTransformer interface { + Transform(ctx context.Context, transformDefinition *models.TransformDefinition, m *models.Message) error +} + +type messageTransformer struct { + config *lib.TransformConfig +} + +func NewMessageTransformer(config *lib.TransformConfig) MessageTransformer { + return &messageTransformer{ + config: config, + } +} + +func (mt *messageTransformer) Transform(ctx context.Context, transformDefinition *models.TransformDefinition, m *models.Message) error { + switch transformDefinition.Type { + case models.TransformTypeJavascript: + transformedPayload, transformedHeaders, err := mt.runJavascriptTransform(m.Payload, m.HttpHeaders, transformDefinition.Script) + if err != nil { + return fmt.Errorf("failed to transform message: %w", err) + } + m.Payload = transformedPayload + m.HttpHeaders = transformedHeaders + return nil + + default: + return fmt.Errorf("unknown transform type: %s", transformDefinition.Type) + } +} + +func (mt *messageTransformer) runJavascriptTransform(payload []byte, headers http.Header, jsScript string) ([]byte, http.Header, error) { + vm := goja.New() + + time.AfterFunc(mt.config.JavascriptTimeout, func() { + vm.Interrupt("halt") + }) + + err := vm.Set("bodyStr", string(payload)) + if err != nil { + return nil, nil, fmt.Errorf("failed to set bodyStr: %w", err) + } + + headersStr, err := json.Marshal(headers) + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal headers to JSON: %w", err) + } + err = vm.Set("headersStr", string(headersStr)) + if err != nil { + return nil, nil, fmt.Errorf("failed to set headersStr: %w", err) + } + + // Prepare the full script + fullScript := fmt.Sprintf(` + /* User Function */ + %s + /* End User Function */ + + const headers = JSON.parse(headersStr); + var results = transform(bodyStr, headers); + [results[0], results[1]]; + `, jsScript) + + // Run the script + val, err := vm.RunString(fullScript) + if err != nil { + return nil, nil, fmt.Errorf("failed to execute JavaScript: %w", err) + } + + // Get the results + results := val.Export().([]interface{}) + if len(results) != 2 { + return nil, nil, fmt.Errorf("expected 2 results in js transform, got %d", len(results)) + } + transformedPayloadStr, ok := results[0].(string) + if !ok { + return nil, nil, fmt.Errorf("expected payload to be of type string, got %T", results[0]) + } + transformedHeadersTemp, ok := results[1].(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("expected headers to be of type map[string]interface{}, got %T", results[1]) + } + + // build back the header object + transformedHeaders := http.Header{} + for k, values := range transformedHeadersTemp { + valuesArr, ok := values.([]interface{}) + if !ok { + return nil, nil, fmt.Errorf("expected header values to be of type []string, got %T", values) + } + + stringValuesArr := make([]string, len(valuesArr)) + for i, value := range valuesArr { + stringValue, ok := value.(string) + if !ok { + return nil, nil, fmt.Errorf("expected header value to be of type string, got %T", value) + } + stringValuesArr[i] = stringValue + } + + transformedHeaders[k] = stringValuesArr + } + + return []byte(transformedPayloadStr), transformedHeaders, nil +} diff --git a/pkg/supervisor/ready.go b/pkg/supervisor/ready.go index ec02a95..d62a173 100644 --- a/pkg/supervisor/ready.go +++ b/pkg/supervisor/ready.go @@ -79,6 +79,21 @@ func (s *Supervisor) startReadyProcessor(ctx context.Context, f *models.Flow, si zap.String("ingestedReqID", m.IngestedReqID), ) + if sink.Transform != nil { + transformDefinition := s.inhooksConfigSvc.GetTransformDefinition(sink.Transform.ID) + if transformDefinition == nil { + logger.Error("transform definition not found", zap.String("transformID", sink.Transform.ID)) + continue + } + + logger.Info("transforming message", zap.String("transformID", transformDefinition.ID)) + err := s.messageTransformer.Transform(ctx, transformDefinition, m) + if err != nil { + logger.Error("failed to transform payload", zap.Error(err)) + continue + } + } + logger.Info("processing message", zap.Int("attempt#", len(m.DeliveryAttempts)+1)) messageProcessingAttemptsCounter.Inc() diff --git a/pkg/supervisor/ready_test.go b/pkg/supervisor/ready_test.go index 8f3fc5f..9334e8c 100644 --- a/pkg/supervisor/ready_test.go +++ b/pkg/supervisor/ready_test.go @@ -28,9 +28,13 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { flowId1 := "flow-1" sinkID1 := "sink-1" + transformID1 := "transform-1" sink1 := &models.Sink{ ID: sinkID1, + Transform: &models.Transform{ + ID: transformID1, + }, } flow1 := &models.Flow{ @@ -41,12 +45,21 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { mID1 := "message-1" m := &models.Message{ - ID: mID1, + ID: mID1, + Payload: []byte(`{"id": "the-payload"}`), + } + + transformDefinition := &models.TransformDefinition{ + ID: "transform-definition-1", + Type: models.TransformTypeJavascript, + Script: "some script", } messageFetcher := mocks.NewMockMessageFetcher(ctrl) + messageTransformer := mocks.NewMockMessageTransformer(ctrl) messageProcessor := mocks.NewMockMessageProcessor(ctrl) processingResultsService := mocks.NewMockProcessingResultsService(ctrl) + inhooksConfigService := mocks.NewMockInhooksConfigService(ctrl) logger, err := zap.NewDevelopment() assert.NoError(t, err) @@ -56,6 +69,8 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { WithMessageProcessor(messageProcessor), WithProcessingResultsService(processingResultsService), WithAppConfig(appConf), + WithMessageTransformer(messageTransformer), + WithInhooksConfigService(inhooksConfigService), WithLogger(logger), ) @@ -70,9 +85,23 @@ func TestSupervisorHandleReadyQueue_OK(t *testing.T) { return m, nil } + // simulate blocking + time.Sleep(100 * time.Millisecond) return nil, nil }) + inhooksConfigService.EXPECT(). + GetTransformDefinition(transformID1). + DoAndReturn(func(sinkID string) *models.TransformDefinition { + return transformDefinition + }) + + messageTransformer.EXPECT(). + Transform(gomock.Any(), transformDefinition, m). + DoAndReturn(func(ctx context.Context, transformDefinition *models.TransformDefinition, message *models.Message) error { + return nil + }) + messageProcessor.EXPECT(). Process(gomock.Any(), sink1, m). DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message) error { @@ -101,9 +130,13 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { flowId1 := "flow-1" sinkID1 := "sink-1" + transformID1 := "transform-1" sink1 := &models.Sink{ ID: sinkID1, + Transform: &models.Transform{ + ID: "transform-1", + }, } flow1 := &models.Flow{ @@ -117,9 +150,17 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { ID: mID1, } + transformDefinition := &models.TransformDefinition{ + ID: "transform-definition-1", + Type: models.TransformTypeJavascript, + Script: "some script", + } + messageFetcher := mocks.NewMockMessageFetcher(ctrl) messageProcessor := mocks.NewMockMessageProcessor(ctrl) processingResultsService := mocks.NewMockProcessingResultsService(ctrl) + messageTransformer := mocks.NewMockMessageTransformer(ctrl) + inhooksConfigService := mocks.NewMockInhooksConfigService(ctrl) logger, err := zap.NewDevelopment() assert.NoError(t, err) @@ -127,6 +168,8 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { s := NewSupervisor( WithMessageFetcher(messageFetcher), WithMessageProcessor(messageProcessor), + WithMessageTransformer(messageTransformer), + WithInhooksConfigService(inhooksConfigService), WithProcessingResultsService(processingResultsService), WithAppConfig(appConf), WithLogger(logger), @@ -147,6 +190,14 @@ func TestSupervisorHandleReadyQueue_Failed(t *testing.T) { return nil, nil }) + inhooksConfigService.EXPECT(). + GetTransformDefinition(transformID1). + Return(transformDefinition) + + messageTransformer.EXPECT(). + Transform(gomock.Any(), transformDefinition, m). + Return(nil) + messageProcessor.EXPECT(). Process(gomock.Any(), sink1, m). DoAndReturn(func(ctx context.Context, sink *models.Sink, m *models.Message) error { diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index 115aa0a..4e439b3 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -21,6 +21,7 @@ type Supervisor struct { schedulerSvc services.SchedulerService processingRecoverySvc services.ProcessingRecoveryService cleanupSvc services.CleanupService + messageTransformer services.MessageTransformer } type SupervisorOpt func(s *Supervisor) @@ -93,6 +94,12 @@ func WithCleanupService(cleanupSvc services.CleanupService) SupervisorOpt { } } +func WithMessageTransformer(messageTransformer services.MessageTransformer) SupervisorOpt { + return func(s *Supervisor) { + s.messageTransformer = messageTransformer + } +} + func (s *Supervisor) Start() { wg := &sync.WaitGroup{} flows := s.inhooksConfigSvc.GetFlows() diff --git a/pkg/testsupport/mocks/gen_mocks.sh b/pkg/testsupport/mocks/gen_mocks.sh index 6153d8f..de0d563 100755 --- a/pkg/testsupport/mocks/gen_mocks.sh +++ b/pkg/testsupport/mocks/gen_mocks.sh @@ -14,6 +14,7 @@ services=( "processing_recovery_service" "cleanup_service" "message_verifier" + "message_transformer" ) for service in ${services[@]} diff --git a/pkg/testsupport/mocks/mock_inhooks_config_service.go b/pkg/testsupport/mocks/mock_inhooks_config_service.go index 4c03c6b..79b3ad4 100644 --- a/pkg/testsupport/mocks/mock_inhooks_config_service.go +++ b/pkg/testsupport/mocks/mock_inhooks_config_service.go @@ -76,6 +76,20 @@ func (mr *MockInhooksConfigServiceMockRecorder) GetFlows() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFlows", reflect.TypeOf((*MockInhooksConfigService)(nil).GetFlows)) } +// GetTransformDefinition mocks base method. +func (m *MockInhooksConfigService) GetTransformDefinition(transformID string) *models.TransformDefinition { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransformDefinition", transformID) + ret0, _ := ret[0].(*models.TransformDefinition) + return ret0 +} + +// GetTransformDefinition indicates an expected call of GetTransformDefinition. +func (mr *MockInhooksConfigServiceMockRecorder) GetTransformDefinition(transformID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransformDefinition", reflect.TypeOf((*MockInhooksConfigService)(nil).GetTransformDefinition), transformID) +} + // Load mocks base method. func (m *MockInhooksConfigService) Load(path string) error { m.ctrl.T.Helper() diff --git a/pkg/testsupport/mocks/mock_message_transformer.go b/pkg/testsupport/mocks/mock_message_transformer.go new file mode 100644 index 0000000..8e00cbb --- /dev/null +++ b/pkg/testsupport/mocks/mock_message_transformer.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/services/message_transformer.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + models "github.com/didil/inhooks/pkg/models" + gomock "github.com/golang/mock/gomock" +) + +// MockMessageTransformer is a mock of MessageTransformer interface. +type MockMessageTransformer struct { + ctrl *gomock.Controller + recorder *MockMessageTransformerMockRecorder +} + +// MockMessageTransformerMockRecorder is the mock recorder for MockMessageTransformer. +type MockMessageTransformerMockRecorder struct { + mock *MockMessageTransformer +} + +// NewMockMessageTransformer creates a new mock instance. +func NewMockMessageTransformer(ctrl *gomock.Controller) *MockMessageTransformer { + mock := &MockMessageTransformer{ctrl: ctrl} + mock.recorder = &MockMessageTransformerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMessageTransformer) EXPECT() *MockMessageTransformerMockRecorder { + return m.recorder +} + +// Transform mocks base method. +func (m *MockMessageTransformer) Transform(ctx context.Context, transformDefinition *models.TransformDefinition, message *models.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Transform", ctx, transformDefinition, message) + ret0, _ := ret[0].(error) + return ret0 +} + +// Transform indicates an expected call of Transform. +func (mr *MockMessageTransformerMockRecorder) Transform(ctx, transformDefinition, message interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Transform", reflect.TypeOf((*MockMessageTransformer)(nil).Transform), ctx, transformDefinition, message) +}