Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Message transform with javascript #55

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ HOST=localhost
PORT=3000
REDIS_URL=redis://localhost:6379
REDIS_INHOOKS_DB_NAME=development
TRANSFORM_JAVASCRIPT_TIMEOUT=1s
2 changes: 1 addition & 1 deletion .env.test.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ APP_ENV=test
HOST=localhost
PORT=3001
REDIS_URL=redis://localhost:6379
REDIS_INHOOKS_DB_NAME=test
REDIS_INHOOKS_DB_NAME=test
77 changes: 77 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Inhooks listens to HTTP webhooks and saves the messages to Redis. A processing m
- Supports delayed processing
- Supports retries on failure with configurable number of attempts, interval and constant or exponential backoff
- Supports different HTTP payloads types: JSON, x-www-form-urlencoded, multipart/form-data
- Supports message transformation using JavaScript ECMAScript 5.1
- ... more features planned

## Usage
Expand Down Expand Up @@ -79,6 +80,82 @@ flows:
currentSecretEnvVar: VERIFICATION_FLOW_1_CURRENT_SECRET # the name of the environment variable containing the verification secret
previousSecretEnvVar: VERIFICATION_FLOW_1_PREVIOUS_SECRET # optional env var that allows rotating secrets without service interruption
```

### Message transformation

#### Transform definition

Message transformation allows you to modify the payload and headers of messages before they are sent to the sinks (destinations). This can be useful for tasks such as adding or removing fields, changing the format of the data, or adding custom headers.

Currently, only JavaScript transformations are supported. The JavaScript function should be named `transform` and should take two parameters: `bodyStr` (the message body as a string) and `headers` (the message headers as a JSON object). The function should return an array with two elements: the transformed payload as a string and the transformed headers as a JSON object.
The `headers` fields has the following format:
```
{
"header-name": ["value1", "value2"]
}
```

Only JavaScript ECMAScript 5.1 is supported at the moment. We use the [goja](https://github.com/dop251/goja) library to execute the JavaScript code. You can read about the limitations on goja's documentation pages.

Here is an example configuration:
```yaml
flows:
- id: flow-1
source:
id: source-1
slug: source-1-slug
type: http
sinks:
- id: sink-1
type: http
url: https://example.com/target
transform:
id: js-transform-1
transform_definitions:
- id: js-transform-1
type: javascript
script: |
function transform(bodyStr, headers) {
const body = JSON.parse(bodyStr);

// add a header
headers["X-INHOOKS-TRANSFORMED"] = ["1"];
// capitalize the message if present
if (body.msg) {
body.msg = body.msg.toUpperCase();
}
// delete a key from the body
delete body.my_dummy_key;

return [JSON.stringify(body), headers];
}
```


#### Testing transform scripts

You can use the `/api/v1/transform` endpoint to test your transform scripts before adding them to your flow configuration. This endpoint allows you to simulate the transformation process and see the results immediately.

To use this endpoint, send a POST request with a JSON payload containing the following fields:
- `body`: The message body as a string
- `headers`: The message headers as a JSON object
- `transformDefinition`: An object containing the `type` and `script` of your transformation

Here's an example of how to use the `/api/v1/transform` endpoint:
```shell
curl -X POST http://localhost:3000/api/v1/transform \
-H "Content-Type: application/json" \
-d '{
"body": "{\"msg\": \"hello world\", \"my_dummy_key\": \"value\"}",
"headers": {"Content-Type": ["application/json"]},
"transformDefinition": {
"type": "javascript",
"script": "function transform(bodyStr, headers) { const body = JSON.parse(bodyStr); headers[\"X-INHOOKS-TRANSFORMED\"] = [\"1\"]; if (body.msg) { body.msg = body.msg.toUpperCase(); } delete body.my_dummy_key; return [JSON.stringify(body), headers]; }"
}
}'
```


### Prometheus metrics
Inhooks exposes Prometheus metrics at the `/api/v1/metrics` endpoint.

Expand Down
15 changes: 15 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
Expand All @@ -26,6 +27,15 @@ var (
func main() {
versionpkg.SetVersion(version)

// handle version command
isVersionCmd := flag.Bool("version", false, "print the version")
flag.Parse()
if *isVersionCmd {
fmt.Println(version)
os.Exit(0)
}

// start server
err := lib.LoadEnv()
if err != nil {
log.Fatalf("failed to load env: %v", err)
Expand All @@ -41,6 +51,8 @@ func main() {
log.Fatalf("failed to initialize logger: %v", err)
}

logger.Info("starting Inhooks", zap.String("version", version))

inhooksConfigSvc := services.NewInhooksConfigService(logger, appConf)
logger.Info("loading inhooks config", zap.String("inhooksConfigFile", appConf.InhooksConfigFile))

Expand All @@ -65,13 +77,15 @@ func main() {
messageEnqueuer := services.NewMessageEnqueuer(redisStore, timeSvc)
messageFetcher := services.NewMessageFetcher(redisStore, timeSvc)
messageVerifier := services.NewMessageVerifier()
messageTransformer := services.NewMessageTransformer(&appConf.Transform)

app := handlers.NewApp(
handlers.WithLogger(logger),
handlers.WithInhooksConfigService(inhooksConfigSvc),
handlers.WithMessageBuilder(messageBuilder),
handlers.WithMessageEnqueuer(messageEnqueuer),
handlers.WithMessageVerifier(messageVerifier),
handlers.WithMessageTransformer(messageTransformer),
)

r := server.NewRouter(app)
Expand Down Expand Up @@ -119,6 +133,7 @@ func main() {
supervisor.WithSchedulerService(schedulerSvc),
supervisor.WithProcessingRecoveryService(processingRecoverySvc),
supervisor.WithCleanupService(cleanupSvc),
supervisor.WithMessageTransformer(messageTransformer),
)

wg.Add(1)
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.4.3 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/esimonov/ifshort v1.0.4 // indirect
github.com/ettle/strcase v0.1.1 // indirect
Expand All @@ -58,6 +60,7 @@ require (
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/fzipp/gocyclo v0.6.0 // indirect
github.com/go-critic/go-critic v0.7.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/go-toolsmith/astcast v1.1.0 // indirect
github.com/go-toolsmith/astcopy v1.1.0 // indirect
github.com/go-toolsmith/astequal v1.1.0 // indirect
Expand All @@ -79,6 +82,7 @@ require (
github.com/golangci/revgrep v0.0.0-20220804021717-745bb2f7c2e6 // indirect
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20230107090616-13ace0543b28 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo=
github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2 h1:4Ew88p5s9dwIk5/woUyqI9BD89NgZoUNH4/rM/h2UDg=
github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2/go.mod h1:o31y53rb/qiIAONF7w3FHJZRqqP3fzHUr1HqanthByw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -159,6 +163,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-toolsmith/astcast v1.1.0 h1:+JN9xZV1A+Re+95pgnMgDboWNVnIMMQXwfBwLRPgSC8=
github.com/go-toolsmith/astcast v1.1.0/go.mod h1:qdcuFWeGGS2xX5bLM/c3U9lewg7+Zu4mr+xPwZIB4ZU=
Expand Down Expand Up @@ -264,6 +270,8 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
Expand Down
5 changes: 5 additions & 0 deletions pkg/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type AppConfig struct {
Supervisor SupervisorConfig
HTTPClient HTTPClientConfig
Sink SinkConfig
Transform TransformConfig
}

type ServerConfig struct {
Expand Down Expand Up @@ -57,6 +58,10 @@ type SinkConfig struct {
DefaultRetryExpMultiplier float64 `env:"SINK_DEFAULT_RETRY_EXP_MULTIPLIER,default=1"`
}

type TransformConfig struct {
JavascriptTimeout time.Duration `env:"TRANSFORM_JAVASCRIPT_TIMEOUT,default=1s"`
}

func InitAppConfig(ctx context.Context) (*AppConfig, error) {
appConf := &AppConfig{}
err := envconfig.Process(ctx, appConf)
Expand Down
32 changes: 31 additions & 1 deletion pkg/models/inhooks_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

type InhooksConfig struct {
Flows []*Flow `yaml:"flows"`
Flows []*Flow `yaml:"flows"`
TransformDefinitions []*TransformDefinition `yaml:"transform_definitions"`
}

var idRegex = regexp.MustCompile(`^[a-zA-Z0-9\-]{1,255}$`)
Expand All @@ -27,6 +28,28 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error {

flowIDs := map[string]bool{}
sourceSlugs := map[string]bool{}
transformIDs := map[string]bool{}

if c.TransformDefinitions != nil {
for i, transform := range c.TransformDefinitions {
if !slices.Contains(TransformTypes, transform.Type) {
return fmt.Errorf("invalid transform type: %s. allowed: %v", transform.Type, TransformTypes)
}

if !idRegex.MatchString(transform.ID) {
return idValidationErr(fmt.Sprintf("transforms[%d].id", i))
}

if transformIDs[transform.ID] {
return fmt.Errorf("transform ids must be unique. duplicate transform id: %s", transform.ID)
}
transformIDs[transform.ID] = true

if transform.Script == "" {
return fmt.Errorf("transform script cannot be empty")
}
}
}

for i, f := range c.Flows {
if !idRegex.MatchString(f.ID) {
Expand Down Expand Up @@ -119,6 +142,13 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error {
return fmt.Errorf("invalid url scheme: %s", sink.URL)
}
}

// validate transform
if sink.Transform != nil {
if !transformIDs[sink.Transform.ID] {
return fmt.Errorf("transform id not found: %s", sink.Transform.ID)
}
}
}
}

Expand Down
Loading
Loading