Skip to content

Commit

Permalink
json lua transform wip
Browse files Browse the repository at this point in the history
  • Loading branch information
didil committed Jun 27, 2024
1 parent e9cffb8 commit f14ed84
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 24 deletions.
2 changes: 2 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func main() {
}

cleanupSvc := services.NewCleanupService(redisStore, timeSvc)
payloadTransformer := services.NewPayloadTransformer()

svisor := supervisor.NewSupervisor(
supervisor.WithLogger(logger),
Expand All @@ -119,6 +120,7 @@ func main() {
supervisor.WithSchedulerService(schedulerSvc),
supervisor.WithProcessingRecoveryService(processingRecoverySvc),
supervisor.WithCleanupService(cleanupSvc),
supervisor.WithPayloadTransformer(payloadTransformer),
)

wg.Add(1)
Expand Down
32 changes: 31 additions & 1 deletion pkg/models/inhooks_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

type InhooksConfig struct {
Flows []*Flow `yaml:"flows"`
Flows []*Flow `yaml:"flows"`
TransformDefinitions []*TransformDefinition `yaml:"transform_definitions"`
}

var idRegex = regexp.MustCompile(`^[a-zA-Z0-9\-]{1,255}$`)
Expand All @@ -27,6 +28,28 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error {

flowIDs := map[string]bool{}
sourceSlugs := map[string]bool{}
luaTransformIDs := map[string]bool{}

if c.TransformDefinitions != nil {
for i, transform := range c.TransformDefinitions {
if !slices.Contains(TransformTypes, transform.Type) {
return fmt.Errorf("invalid transform type: %s. allowed: %v", transform.Type, TransformTypes)
}

if !idRegex.MatchString(transform.ID) {
return idValidationErr(fmt.Sprintf("transforms[%d].id", i))
}

if luaTransformIDs[transform.ID] {
return fmt.Errorf("transform ids must be unique. duplicate transform id: %s", transform.ID)
}
luaTransformIDs[transform.ID] = true

if transform.Script == "" {
return fmt.Errorf("transform script cannot be empty")
}
}
}

for i, f := range c.Flows {
if !idRegex.MatchString(f.ID) {
Expand Down Expand Up @@ -119,6 +142,13 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error {
return fmt.Errorf("invalid url scheme: %s", sink.URL)
}
}

// validate transform
if sink.Transform != nil {
if !luaTransformIDs[sink.Transform.ID] {
return fmt.Errorf("lua transform id not found: %s", sink.Transform.ID)
}
}
}
}

Expand Down
123 changes: 123 additions & 0 deletions pkg/models/inhooks_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestValidateInhooksConfig_OK(t *testing.T) {
Type: "http",
URL: "https://example.com/sink",
Delay: &delay,
Transform: &Transform{
ID: "lua-transform-1",
},
},
},
},
Expand All @@ -57,6 +60,13 @@ func TestValidateInhooksConfig_OK(t *testing.T) {
},
},
},
TransformDefinitions: []*TransformDefinition{
{
ID: "lua-transform-1",
Type: TransformTypeLua,
Script: "function transform(data) data.username = data.name end",
},
},
}

assert.NoError(t, ValidateInhooksConfig(appConf, c))
Expand Down Expand Up @@ -359,3 +369,116 @@ func TestValidateInhooksConfig_InvalidHMACAlgorithm(t *testing.T) {

assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "invalid hmac algorithm: somealgorithm. allowed: [sha256]")
}
func TestValidateInhooksConfig_InexistingTransformID(t *testing.T) {
ctx := context.Background()
appConf, err := testsupport.InitAppConfig(ctx)
assert.NoError(t, err)

c := &InhooksConfig{
Flows: []*Flow{
{
ID: "flow-1",
Source: &Source{
ID: "source-1",
Slug: "source-1-slug",
Type: "http",
},
Sinks: []*Sink{
{
ID: "sink-1",
Type: "http",
URL: "https://example.com/sink",
Transform: &Transform{
ID: "non-existent-transform",
},
},
},
},
},
TransformDefinitions: []*TransformDefinition{
{
ID: "lua-transform-1",
Type: TransformTypeLua,
Script: "function transform(data) end",
},
},
}

assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "transform id not found: non-existent-transform")
}

func TestValidateInhooksConfig_InvalidTransformType(t *testing.T) {
ctx := context.Background()
appConf, err := testsupport.InitAppConfig(ctx)
assert.NoError(t, err)

c := &InhooksConfig{
Flows: []*Flow{
{
ID: "flow-1",
Source: &Source{
ID: "source-1",
Slug: "source-1-slug",
Type: "http",
},
Sinks: []*Sink{
{
ID: "sink-1",
Type: "http",
URL: "https://example.com/sink",
Transform: &Transform{
ID: "lua-transform-1",
},
},
},
},
},
TransformDefinitions: []*TransformDefinition{
{
ID: "lua-transform-1",
Type: "invalid-type",
Script: "function transform(data) end",
},
},
}

assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "invalid transform type: invalid-type. allowed: [lua]")
}

func TestValidateInhooksConfig_EmptyTransformScript(t *testing.T) {
ctx := context.Background()
appConf, err := testsupport.InitAppConfig(ctx)
assert.NoError(t, err)

c := &InhooksConfig{
Flows: []*Flow{
{
ID: "flow-1",
Source: &Source{
ID: "source-1",
Slug: "source-1-slug",
Type: "http",
},
Sinks: []*Sink{
{
ID: "sink-1",
Type: "http",
URL: "https://example.com/sink",
Transform: &Transform{
ID: "lua-transform-1",
},
},
},
},
},
TransformDefinitions: []*TransformDefinition{
{
ID: "lua-transform-1",
Type: TransformTypeLua,
Script: "",
},
},
}

assert.ErrorContains(t, ValidateInhooksConfig(appConf, c), "transform script cannot be empty")
}
2 changes: 2 additions & 0 deletions pkg/models/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ type Sink struct {
RetryExpMultiplier *float64 `yaml:"retryExpMultiplier"`
// Max attempts
MaxAttempts *int `yaml:"maxAttempts"`
// Transform to apply to the data
Transform *Transform `yaml:"transform"`
}
21 changes: 21 additions & 0 deletions pkg/models/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package models

type TransformType string

const (
TransformTypeLua = "lua"
)

var TransformTypes = []TransformType{
TransformTypeLua,
}

type TransformDefinition struct {
ID string `yaml:"id"`
Type TransformType `yaml:"type"`
Script string `yaml:"script"`
}

type Transform struct {
ID string `yaml:"id"`
}
39 changes: 34 additions & 5 deletions pkg/services/inhooks_config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ type InhooksConfigService interface {
FindFlowForSource(sourceSlug string) *models.Flow
GetFlow(flowID string) *models.Flow
GetFlows() map[string]*models.Flow
GetTransformDefinition(transformID string) *models.TransformDefinition
}

type inhooksConfigService struct {
logger *zap.Logger
appConf *lib.AppConfig
inhooksConfig *models.InhooksConfig
flowsBySourceSlug map[string]*models.Flow
flowsByID map[string]*models.Flow
logger *zap.Logger
appConf *lib.AppConfig
inhooksConfig *models.InhooksConfig
flowsBySourceSlug map[string]*models.Flow
flowsByID map[string]*models.Flow
transformDefinitionsByID map[string]*models.TransformDefinition
}

func NewInhooksConfigService(logger *zap.Logger, appConf *lib.AppConfig) InhooksConfigService {
Expand Down Expand Up @@ -60,6 +62,11 @@ func (s *inhooksConfigService) Load(filepath string) error {
return errors.Wrapf(err, "failed to build flows map")
}

err = s.initTransformDefinitionsMap()
if err != nil {
return errors.Wrapf(err, "failed to build transform definitions map")
}

s.log()

return nil
Expand All @@ -77,6 +84,10 @@ func (s *inhooksConfigService) GetFlows() map[string]*models.Flow {
return s.flowsByID
}

func (s *inhooksConfigService) GetTransformDefinition(transformID string) *models.TransformDefinition {
return s.transformDefinitionsByID[transformID]
}

func (s *inhooksConfigService) initFlowsMaps() error {
s.flowsBySourceSlug = map[string]*models.Flow{}
s.flowsByID = map[string]*models.Flow{}
Expand Down Expand Up @@ -104,7 +115,24 @@ func (s *inhooksConfigService) initFlowsMaps() error {
return nil
}

func (s *inhooksConfigService) initTransformDefinitionsMap() error {
s.transformDefinitionsByID = map[string]*models.TransformDefinition{}
for _, transformDefinition := range s.inhooksConfig.TransformDefinitions {
s.transformDefinitionsByID[transformDefinition.ID] = transformDefinition
}

return nil
}

func (s *inhooksConfigService) log() {
for _, transform := range s.inhooksConfig.TransformDefinitions {
s.logger.Info("loaded transform",
zap.String("id", transform.ID),
zap.String("type", string(transform.Type)),
zap.String("script", transform.Script),
)
}

for _, f := range s.flowsByID {
s.logger.Info("loaded flow",
zap.String("id", f.ID),
Expand All @@ -119,6 +147,7 @@ func (s *inhooksConfigService) log() {
zap.String("type", string(sink.Type)),
zap.String("url", string(sink.URL)),
zap.Durationp("delay", sink.Delay),
zap.Any("transform", sink.Transform),
)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/services/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type MessageProcessor interface {
Process(ctx context.Context, sink *models.Sink, m *models.Message) error
Process(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error
}

type messageProcessor struct {
Expand All @@ -25,17 +25,17 @@ func NewMessageProcessor(httpClient *http.Client) MessageProcessor {
}
}

func (p *messageProcessor) Process(ctx context.Context, sink *models.Sink, m *models.Message) error {
func (p *messageProcessor) Process(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error {
switch sink.Type {
case models.SinkTypeHttp:
return p.processHTTP(ctx, sink, m)
return p.processHTTP(ctx, sink, m, transformedPayload)
default:
return fmt.Errorf("unkown sink type %s", sink.Type)
}
}

func (p *messageProcessor) processHTTP(ctx context.Context, sink *models.Sink, m *models.Message) error {
buf := bytes.NewBuffer(m.Payload)
func (p *messageProcessor) processHTTP(ctx context.Context, sink *models.Sink, m *models.Message, transformedPayload []byte) error {
buf := bytes.NewBuffer(transformedPayload)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, sink.URL, buf)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/services/message_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func TestMessageProcessor(t *testing.T) {
p := NewMessageProcessor(cl)

payload := []byte(`{"id": "the-payload"}`)
transformedPayload := []byte(`{"id": "the-transformed-payload"}`)

headers := http.Header{
"X-Key": []string{"123"},
"User-Agent": []string{"Sender-User-Agent"},
Expand All @@ -31,14 +33,14 @@ func TestMessageProcessor(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
assert.NoError(t, err)
assert.Equal(t, payload, body)
assert.Equal(t, transformedPayload, body)

assert.Equal(t, rawQuery, req.URL.RawQuery)

assert.Equal(t, http.Header{
"X-Key": []string{"123"},
"User-Agent": []string{"Inhooks/test (https://github.com/didil/inhooks)"},
"Content-Length": []string{"21"},
"Content-Length": []string{"33"},
"Accept-Encoding": []string{"*"},
}, req.Header)
}))
Expand All @@ -55,7 +57,7 @@ func TestMessageProcessor(t *testing.T) {
Payload: payload,
}

err := p.Process(ctx, sink, m)
err := p.Process(ctx, sink, m, transformedPayload)
assert.NoError(t, err)
}

Expand Down
Loading

0 comments on commit f14ed84

Please sign in to comment.