Skip to content

Commit

Permalink
Allow a connector's or processor's plugin name to be updated (#1938)
Browse files Browse the repository at this point in the history
* allow plugin change

* tests

* update tests

* tests

* log

* update processors

* fix tests

* update test

---------

Co-authored-by: Raúl Barroso <[email protected]>
  • Loading branch information
hariso and raulb authored Nov 8, 2024
1 parent 048a99c commit 977b037
Show file tree
Hide file tree
Showing 30 changed files with 275 additions and 290 deletions.
7 changes: 6 additions & 1 deletion pkg/connector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,17 @@ func (s *Service) Delete(ctx context.Context, id string, dispenserFetcher Plugin
}

// Update updates the connector config.
func (s *Service) Update(ctx context.Context, id string, data Config) (*Instance, error) {
func (s *Service) Update(ctx context.Context, id string, plugin string, data Config) (*Instance, error) {
conn, err := s.Get(ctx, id)
if err != nil {
return nil, err
}

if conn.Plugin != plugin {
s.logger.Warn(ctx).Msgf("connector plugin changing from %v to %v, "+
"this may lead to unexpected behavior and configuration issues.", conn.Plugin, plugin)
}
conn.Plugin = plugin
conn.Config = data
conn.UpdatedAt = time.Now().UTC()

Expand Down
6 changes: 4 additions & 2 deletions pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func TestService_UpdateSuccess(t *testing.T) {
Name: "changed-name",
Settings: map[string]string{"foo": "bar"},
}
wantPlugin := "changed-plugin"

conn, err := service.Create(
ctx,
Expand All @@ -530,10 +531,11 @@ func TestService_UpdateSuccess(t *testing.T) {
is.NoErr(err)

beforeUpdate := time.Now()
got, err := service.Update(ctx, conn.ID, want)
got, err := service.Update(ctx, conn.ID, wantPlugin, want)
is.NoErr(err)

is.Equal(got.Config, want)
is.Equal(got.Plugin, wantPlugin)
is.True(!got.UpdatedAt.Before(beforeUpdate))
}

Expand All @@ -545,7 +547,7 @@ func TestService_UpdateInstanceNotFound(t *testing.T) {

service := NewService(logger, db, nil)
// update connector that does not exist
got, err := service.Update(ctx, uuid.NewString(), Config{})
got, err := service.Update(ctx, uuid.NewString(), "foo-plugin", Config{})
is.True(err != nil)
is.True(cerrors.Is(err, ErrInstanceNotFound))
is.Equal(got, nil)
Expand Down
6 changes: 3 additions & 3 deletions pkg/orchestrator/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error {
return nil
}

func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config connector.Config) (*connector.Instance, error) {
func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, plugin string, config connector.Config) (*connector.Instance, error) {
var r rollback.R
defer r.MustExecute()
txn, ctx, err := c.db.NewTransaction(ctx, true)
Expand Down Expand Up @@ -181,12 +181,12 @@ func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config co
}

oldConfig := conn.Config
conn, err = c.connectors.Update(ctx, id, config)
conn, err = c.connectors.Update(ctx, id, plugin, config)
if err != nil {
return nil, err
}
r.Append(func() error {
_, err = c.connectors.Update(ctx, id, oldConfig)
_, err = c.connectors.Update(ctx, id, conn.Plugin, oldConfig)
return err
})
err = txn.Commit()
Expand Down
12 changes: 6 additions & 6 deletions pkg/orchestrator/connectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,11 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
ValidateSourceConfig(gomock.Any(), conn.Plugin, newConfig.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, newConfig).
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, newConfig).
Return(want, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, conn.ID, newConfig)
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, newConfig)
is.NoErr(err)
is.Equal(got, want)
}
Expand All @@ -507,7 +507,7 @@ func TestConnectorOrchestrator_Update_ConnectorNotExist(t *testing.T) {
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, id, connector.Config{})
got, err := orc.Connectors.Update(ctx, id, "test-plugin", connector.Config{})
is.True(got == nil)
is.True(err != nil)
is.True(cerrors.Is(err, wantErr))
Expand Down Expand Up @@ -537,7 +537,7 @@ func TestConnectorOrchestrator_Update_PipelineRunning(t *testing.T) {
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{})
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{})
is.True(got == nil)
is.True(err != nil)
is.Equal(pipeline.ErrPipelineRunning, err)
Expand Down Expand Up @@ -571,11 +571,11 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
ValidateDestinationConfig(gomock.Any(), conn.Plugin, conn.Config.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, connector.Config{}).
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, connector.Config{}).
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{})
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{})
is.True(got == nil)
is.True(err != nil)
is.True(cerrors.Is(err, wantErr))
Expand Down
24 changes: 12 additions & 12 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type ConnectorService interface {
Get(ctx context.Context, id string) (*connector.Instance, error)
Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error)
Delete(ctx context.Context, id string, dispenserFetcher connector.PluginDispenserFetcher) error
Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error)
Update(ctx context.Context, id string, plugin string, c connector.Config) (*connector.Instance, error)

AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
Expand All @@ -108,7 +108,7 @@ type ProcessorService interface {
Get(ctx context.Context, id string) (*processor.Instance, error)
Create(ctx context.Context, id string, plugin string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType, condition string) (*processor.Instance, error)
MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error)
Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error)
Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error)
Delete(ctx context.Context, id string) error
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/orchestrator/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor.
return p.processors.Get(ctx, id)
}

func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) {
func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error) {
var r rollback.R
defer r.MustExecute()

Expand All @@ -157,6 +157,7 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce
return nil, cerrors.Errorf("processor %q cannot be updated: %w", proc.ID, ErrImmutableProvisionedByConfig)
}
// provisioned by API
oldPlugin := proc.Plugin
oldConfig := proc.Config

pl, err := p.getProcessorsPipeline(ctx, proc.Parent)
Expand All @@ -168,12 +169,12 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce
return nil, pipeline.ErrPipelineRunning
}

proc, err = p.processors.Update(ctx, id, cfg)
proc, err = p.processors.Update(ctx, id, plugin, cfg)
if err != nil {
return nil, err
}
r.Append(func() error {
_, err = p.processors.Update(ctx, proc.ID, oldConfig)
_, err = p.processors.Update(ctx, proc.ID, oldPlugin, oldConfig)
return err
})

Expand Down
16 changes: 8 additions & 8 deletions pkg/orchestrator/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_Success(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
procsMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config).
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config).
Return(want, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.NoErr(err)
is.Equal(want, got)
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorNotExist(t *testing.T)
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match")
is.True(got == nil)
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_PipelineRunning(t *testing.T) {
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.True(err != nil)
is.Equal(pipeline.ErrPipelineRunning, err)
is.True(got == nil)
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorProvisionedByConfig(t *
Return(before, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.Equal(got, nil)
is.True(err != nil)
is.True(cerrors.Is(err, ErrImmutableProvisionedByConfig)) // expected ErrImmutableProvisionedByConfig
Expand Down Expand Up @@ -550,11 +550,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_UpdateFail(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
procsMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config).
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config).
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.True(err != nil)
is.Equal(wantErr, err)
is.True(got == nil)
Expand Down Expand Up @@ -586,7 +586,7 @@ func TestProcessorOrchestrator_UpdateOnConnector_ConnectorNotExist(t *testing.T)
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, want.ID, processor.Config{})
got, err := orc.Processors.Update(ctx, want.ID, want.Plugin, processor.Config{})
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match
is.True(got == nil)
Expand Down
11 changes: 10 additions & 1 deletion pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,25 @@ func (s *Service) Create(
}

// Update will update a processor instance config.
func (s *Service) Update(ctx context.Context, id string, cfg Config) (*Instance, error) {
func (s *Service) Update(ctx context.Context, id string, plugin string, cfg Config) (*Instance, error) {
instance, err := s.Get(ctx, id)
if err != nil {
return nil, err
}
if plugin == "" {
return nil, cerrors.Errorf("could not update processor instance (ID: %s): plugin name is empty", id)
}

if instance.running {
return nil, cerrors.Errorf("could not update processor instance (ID: %s): %w", id, ErrProcessorRunning)
}

if instance.Plugin != plugin {
s.logger.Warn(ctx).Msgf("processor plugin changing from %v to %v, "+
"this may lead to unexpected behavior and configuration issues.", instance.Plugin, plugin)
}

instance.Plugin = plugin
instance.Config = cfg
instance.UpdatedAt = time.Now()

Expand Down
Loading

0 comments on commit 977b037

Please sign in to comment.