Skip to content

Commit

Permalink
feat: support creating config patches in the infrastructure providers
Browse files Browse the repository at this point in the history
Patches can be created for a single machine in the machine provision
flow: the provider can call `CreateConfigPatch` method at any point.
This will create a `ConfigPatchRequest` resource which will be turned
into a `ConfigPatch` after the `MachineRequestStatus` UUID gets
populated.

Fixes: #728

Signed-off-by: Artem Chernyshev <[email protected]>
  • Loading branch information
Unix4ever committed Nov 6, 2024
1 parent 3e8bc8d commit fe0fc17
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 2 deletions.
57 changes: 57 additions & 0 deletions client/pkg/infra/controllers/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (ctrl *ProvisionController[T]) Settings() controller.QSettings {
Kind: controller.InputQMapped,
ID: optional.Some(siderolink.ConfigID),
},
{
Namespace: resources.InfraProviderNamespace,
Type: infra.ConfigPatchRequestType,
Kind: controller.InputQMappedDestroyReady,
},
{
Namespace: t.ResourceDefinition().DefaultNamespace,
Type: t.ResourceDefinition().Type,
Expand All @@ -91,6 +96,10 @@ func (ctrl *ProvisionController[T]) Settings() controller.QSettings {
Kind: controller.OutputShared,
Type: t.ResourceDefinition().Type,
},
{
Kind: controller.OutputShared,
Type: infra.ConfigPatchRequestType,
},
},
Concurrency: optional.Some(ctrl.concurrency),
}
Expand Down Expand Up @@ -214,6 +223,7 @@ func (ctrl *ProvisionController[T]) reconcileRunning(ctx context.Context, r cont
st,
connectionParams,
ctrl.imageFactory,
r,
))

st.Metadata().Annotations().Set(currentStepAnnotation, step.Name())
Expand Down Expand Up @@ -259,6 +269,41 @@ func (ctrl *ProvisionController[T]) reconcileRunning(ctx context.Context, r cont
return nil
}

func (ctrl *ProvisionController[T]) removePatches(ctx context.Context, r controller.QRuntime, requestID string) (bool, error) {
destroyReady := true

patches, err := safe.ReaderListAll[*infra.ConfigPatchRequest](ctx, r, state.WithLabelQuery(
resource.LabelEqual(omni.LabelInfraProviderID, ctrl.providerID),
resource.LabelEqual(omni.LabelMachineRequest, requestID),
))
if err != nil {
return false, err
}

for request := range patches.All() {
ready, err := r.Teardown(ctx, request.Metadata())
if err != nil {
if state.IsNotFoundError(err) {
continue
}

return false, err
}

if !ready {
destroyReady = false

continue
}

if err = r.Destroy(ctx, request.Metadata()); err != nil && !state.IsNotFoundError(err) {
return false, err
}
}

return destroyReady, nil
}

func (ctrl *ProvisionController[T]) initializeStatus(ctx context.Context, r controller.QRuntime, logger *zap.Logger, machineRequest *infra.MachineRequest) (*infra.MachineRequestStatus, error) {
mrs, err := safe.ReaderGetByID[*infra.MachineRequestStatus](ctx, r, machineRequest.Metadata().ID())
if err != nil && !state.IsNotFoundError(err) {
Expand Down Expand Up @@ -309,6 +354,18 @@ func (ctrl *ProvisionController[T]) reconcileTearingDown(ctx context.Context, r
return err
}

{
var ready bool

if ready, err = ctrl.removePatches(ctx, r, machineRequest.Metadata().ID()); err != nil {
return err
}

if !ready {
return nil
}
}

resources := []resource.Metadata{
resource.NewMetadata(t.ResourceDefinition().DefaultNamespace, t.ResourceDefinition().Type, machineRequest.Metadata().ID(), resource.VersionUndefined),
*infra.NewMachineRequestStatus(machineRequest.Metadata().ID()).Metadata(),
Expand Down
14 changes: 14 additions & 0 deletions client/pkg/infra/infra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (p *provisioner) ProvisionSteps() []provision.Step[*TestResource] {

return nil
}),
provision.NewStep("patches", func(ctx context.Context, _ *zap.Logger, pctx provision.Context[*TestResource]) error {
return pctx.CreateConfigPatch(ctx, pctx.GetRequestID(), []byte("machine: {}"))
}),
provision.NewStep("schematic", genSchematic),
provision.NewStep("validate", validateConnectionParams),
provision.NewStep("provision", func(ctx context.Context, _ *zap.Logger, pctx provision.Context[*TestResource]) error {
Expand Down Expand Up @@ -269,6 +272,8 @@ func TestInfra(t *testing.T) {
machineRequest.Metadata().Labels().Set(omni.LabelInfraProviderID, providerID)
machineRequest.Metadata().Labels().Set(customLabel, customValue)

patchID := machineRequest.Metadata().ID()

require.NoError(t, state.Create(ctx, machineRequest))

connectionParams := siderolink.NewConnectionParams(resources.DefaultNamespace, siderolink.ConfigID)
Expand Down Expand Up @@ -299,6 +304,13 @@ func TestInfra(t *testing.T) {
assert.Equal(specs.MachineRequestStatusSpec_PROVISIONED, machineRequestStatus.TypedSpec().Value.Stage)
})

rtestutils.AssertResources(ctx, t, state, []string{patchID}, func(r *infrares.ConfigPatchRequest, assert *assert.Assertions) {
data, err := r.TypedSpec().Value.GetUncompressedData()

assert.NoError(err)
assert.EqualValues([]byte("machine: {}"), data.Data())
})

rtestutils.AssertResources(ctx, t, state, []string{machineRequest.Metadata().ID()}, func(testResource *TestResource, assert *assert.Assertions) {
assert.True(testResource.TypedSpec().Value.Connected)
})
Expand All @@ -311,6 +323,8 @@ func TestInfra(t *testing.T) {
rtestutils.AssertNoResource[*TestResource](ctx, t, state, machineRequest.Metadata().ID())

require.Nil(t, p.getMachine(machineRequest.Metadata().ID()))

rtestutils.AssertNoResource[*infrares.ConfigPatchRequest](ctx, t, state, patchID)
}

func setupInfra(ctx context.Context, t *testing.T, p *provisioner) state.State {
Expand Down
24 changes: 24 additions & 0 deletions client/pkg/infra/provision/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ package provision

import (
"context"
"errors"
"slices"
"strings"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/image-factory/pkg/schematic"
"go.uber.org/zap"
"gopkg.in/yaml.v3"

"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/infra"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
)
Expand Down Expand Up @@ -90,13 +94,15 @@ func NewContext[T resource.Resource](
state T,
connectionParams ConnectionParams,
imageFactory FactoryClient,
runtime controller.QRuntime,
) Context[T] {
return Context[T]{
machineRequest: machineRequest,
MachineRequestStatus: machineRequestStatus,
State: state,
ConnectionParams: connectionParams,
imageFactory: imageFactory,
runtime: runtime,
}
}

Expand All @@ -105,6 +111,7 @@ type Context[T resource.Resource] struct {
machineRequest *infra.MachineRequest
imageFactory FactoryClient
MachineRequestStatus *infra.MachineRequestStatus
runtime controller.QRuntime
State T
ConnectionParams ConnectionParams
}
Expand Down Expand Up @@ -134,6 +141,23 @@ func (context *Context[T]) UnmarshalProviderData(dest any) error {
return yaml.Unmarshal([]byte(context.machineRequest.TypedSpec().Value.ProviderData), dest)
}

// CreateConfigPatch for the provisioned machine.
func (context *Context[T]) CreateConfigPatch(ctx context.Context, name string, data []byte) error {
r := infra.NewConfigPatchRequest(resources.InfraProviderNamespace, name)

providerID, ok := context.machineRequest.Metadata().Labels().Get(omni.LabelInfraProviderID)
if !ok {
return errors.New("infra provider id is not set on the machine request")
}

return safe.WriterModify(ctx, context.runtime, r, func(r *infra.ConfigPatchRequest) error {
r.Metadata().Labels().Set(omni.LabelInfraProviderID, providerID)
r.Metadata().Labels().Set(omni.LabelMachineRequest, context.GetRequestID())

return r.TypedSpec().Value.SetUncompressedData(data)
})
}

// GenerateSchematicID generate the final schematic out of the machine request.
// This method also calls the image factory and uploads the schematic there.
func (context *Context[T]) GenerateSchematicID(ctx context.Context, logger *zap.Logger, opts ...SchematicOption) (string, error) {
Expand Down
50 changes: 50 additions & 0 deletions client/pkg/omni/resources/infra/config_patch_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package infra

import (
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/meta"
"github.com/cosi-project/runtime/pkg/resource/protobuf"
"github.com/cosi-project/runtime/pkg/resource/typed"

"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources"
)

// NewConfigPatchRequest creates new ConfigPatchRequest resource.
func NewConfigPatchRequest(ns string, id resource.ID) *ConfigPatchRequest {
return typed.NewResource[ConfigPatchRequestSpec, ConfigPatchRequestExtension](
resource.NewMetadata(ns, ConfigPatchRequestType, id, resource.VersionUndefined),
protobuf.NewResourceSpec(&specs.ConfigPatchSpec{}),
)
}

const (
// ConfigPatchRequestType is the type of the ConfigPatch resource.
// tsgen:ConfigPatchRequestType
ConfigPatchRequestType = resource.Type("ConfigPatchRequests.omni.sidero.dev")
)

// ConfigPatchRequest requests a config patch to be created for the machine.
// The controller should copy this resource contents to the target config patch, if the patch is valid.
type ConfigPatchRequest = typed.Resource[ConfigPatchRequestSpec, ConfigPatchRequestExtension]

// ConfigPatchRequestSpec wraps specs.ConfigPatchRequestSpec.
type ConfigPatchRequestSpec = protobuf.ResourceSpec[specs.ConfigPatchSpec, *specs.ConfigPatchSpec]

// ConfigPatchRequestExtension provides auxiliary methods for ConfigPatch resource.
type ConfigPatchRequestExtension struct{}

// ResourceDefinition implements [typed.Extension] interface.
func (ConfigPatchRequestExtension) ResourceDefinition() meta.ResourceDefinitionSpec {
return meta.ResourceDefinitionSpec{
Type: ConfigPatchRequestType,
Aliases: []resource.Type{},
DefaultNamespace: resources.InfraProviderNamespace,
PrintColumns: []meta.PrintColumn{},
Sensitivity: meta.Sensitive,
}
}
1 change: 1 addition & 0 deletions client/pkg/omni/resources/infra/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ func init() {
registry.MustRegisterResource(MachineRequestType, &MachineRequest{})
registry.MustRegisterResource(MachineRequestStatusType, &MachineRequestStatus{})
registry.MustRegisterResource(InfraProviderStatusType, &ProviderStatus{})
registry.MustRegisterResource(ConfigPatchRequestType, &ConfigPatchRequest{})
}
1 change: 1 addition & 0 deletions cmd/integration-test/pkg/tests/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,7 @@ func AssertResourceAuthz(rootCtx context.Context, rootCli *client.Client, client
delete(untestedResourceTypes, infra.MachineRequestType)
delete(untestedResourceTypes, infra.MachineRequestStatusType)
delete(untestedResourceTypes, infra.InfraProviderStatusType)
delete(untestedResourceTypes, infra.ConfigPatchRequestType)

for _, tc := range testCases {
for _, testVerb := range allVerbs {
Expand Down
4 changes: 2 additions & 2 deletions cmd/integration-test/pkg/tests/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func AssertStatsLimits(testCtx context.Context) TestFunc {
{
name: "resource CRUD",
query: `sum(omni_resource_operations_total{operation=~"create|update", type!="MachineStatusLinks.omni.sidero.dev"})`,
check: func(assert *assert.Assertions, value float64) { assert.Less(value, float64(10000)) },
check: func(assert *assert.Assertions, value float64) { assert.Less(value, float64(11000)) },
},
{
name: "queue length",
Expand All @@ -45,7 +45,7 @@ func AssertStatsLimits(testCtx context.Context) TestFunc {
{
name: "controller wakeups",
query: `sum(omni_runtime_controller_wakeups{controller!="MachineStatusLinkController"})`,
check: func(assert *assert.Assertions, value float64) { assert.Less(value, float64(10000)) },
check: func(assert *assert.Assertions, value float64) { assert.Less(value, float64(11000)) },
},
} {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

package omni

import (
"context"
"errors"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/controller/generic/qtransform"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/xerrors"
"go.uber.org/zap"

"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/infra"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/helpers"
)

// InfraProviderConfigPatchController manages endpoints for each Cluster.
type InfraProviderConfigPatchController = qtransform.QController[*infra.ConfigPatchRequest, *omni.ConfigPatch]

// NewInfraProviderConfigPatchController initializes ConfigPatchRequestController.
func NewInfraProviderConfigPatchController() *InfraProviderConfigPatchController {
return qtransform.NewQController(
qtransform.Settings[*infra.ConfigPatchRequest, *omni.ConfigPatch]{
Name: "ConfigPatchRequestController",
MapMetadataFunc: func(request *infra.ConfigPatchRequest) *omni.ConfigPatch {
return omni.NewConfigPatch(resources.DefaultNamespace, request.Metadata().ID())
},
UnmapMetadataFunc: func(configPatch *omni.ConfigPatch) *infra.ConfigPatchRequest {
return infra.NewConfigPatchRequest(resources.DefaultNamespace, configPatch.Metadata().ID())
},
TransformFunc: func(ctx context.Context, r controller.Reader, _ *zap.Logger, request *infra.ConfigPatchRequest, patch *omni.ConfigPatch) error {
machineRequestID, ok := request.Metadata().Labels().Get(omni.LabelMachineRequest)
if !ok {
return xerrors.NewTaggedf[qtransform.DestroyOutputTag]("missing machine request label on the patch request")
}

machineRequestStatus, err := safe.ReaderGetByID[*infra.MachineRequestStatus](ctx, r, machineRequestID)
if err != nil {
if state.IsNotFoundError(err) {
return xerrors.NewTaggedf[qtransform.DestroyOutputTag]("machine request status with id %q doesn't exist", machineRequestID)
}

return err
}

if machineRequestStatus.TypedSpec().Value.Id == "" {
return errors.New("failed to create config patch from the request: machine request status doesn't have machine UUID")
}

patch.TypedSpec().Value = request.TypedSpec().Value

helpers.CopyAllLabels(request, patch)

patch.Metadata().Labels().Set(omni.LabelSystemPatch, "")
patch.Metadata().Labels().Set(omni.LabelMachine, machineRequestStatus.TypedSpec().Value.Id)

return nil
},
},
qtransform.WithExtraMappedInput(
func(ctx context.Context, _ *zap.Logger, r controller.QRuntime, machineRequestStatus *infra.MachineRequestStatus) ([]resource.Pointer, error) {
patchRequests, err := safe.ReaderListAll[*infra.ConfigPatchRequest](ctx, r, state.WithLabelQuery(
resource.LabelEqual(omni.LabelMachineRequest, machineRequestStatus.Metadata().ID())),
)
if err != nil {
return nil, err
}

return safe.ToSlice(patchRequests, func(r *infra.ConfigPatchRequest) resource.Pointer { return r.Metadata() }), nil
},
),
qtransform.WithOutputKind(controller.OutputShared),
)
}
Loading

0 comments on commit fe0fc17

Please sign in to comment.