Skip to content

Commit

Permalink
dynamic host volumes: node selection via constraints
Browse files Browse the repository at this point in the history
When making a request to create a dynamic host volumes, users can pass a node
pool and constraints instead of a specific node ID. This changeset implements a
node scheduling logic by instantiating a filter by node pool and constraint
checker borrowed from the scheduler package.

Ref: #24479
  • Loading branch information
tgross committed Nov 20, 2024
1 parent 23c19e5 commit 822d291
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 38 deletions.
98 changes: 76 additions & 22 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package nomad
import (
"fmt"
"net/http"
"regexp"
"strings"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)

// HostVolume is the server RPC endpoint for host volumes
Expand Down Expand Up @@ -425,28 +427,11 @@ func (v *HostVolume) validateVolumeForState(vol *structs.HostVolume, snap *state

func (v *HostVolume) createVolume(vol *structs.HostVolume) error {

// TODO(1.10.0): proper node selection based on constraints and node
// pool. Also, should we move this into the validator step?
if vol.NodeID == "" {
var iter memdb.ResultIterator
var err error
var raw any
if vol.NodePool != "" {
iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool)
} else {
iter, err = v.srv.State().Nodes(nil)
}
if err != nil {
return err
}
raw = iter.Next()
if raw == nil {
return fmt.Errorf("no node meets constraints for volume")
}

node := raw.(*structs.Node)
vol.NodeID = node.ID
node, err := v.placeHostVolume(vol)
if err != nil {
return fmt.Errorf("could not place volume %q: %w", vol.Name, err)
}
vol.NodeID = node.ID

method := "ClientHostVolume.Create"
cReq := &cstructs.ClientHostVolumeCreateRequest{
Expand All @@ -459,7 +444,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
Parameters: vol.Parameters,
}
cResp := &cstructs.ClientHostVolumeCreateResponse{}
err := v.srv.RPC(method, cReq, cResp)
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
Expand All @@ -474,6 +459,75 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
return nil
}

// placeHostVolume finds a node that matches the node pool and constraints,
// which doesn't already have a volume by that name. It returns a non-nil Node
// or an error indicating placement failed.
func (v *HostVolume) placeHostVolume(vol *structs.HostVolume) (*structs.Node, error) {

var iter memdb.ResultIterator
var err error
if vol.NodePool != "" {
iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool)
} else {
iter, err = v.srv.State().Nodes(nil)
}
if err != nil {
return nil, err
}

var checker *scheduler.ConstraintChecker

if len(vol.Constraints) > 0 {
ctx := &placementContext{
regexpCache: make(map[string]*regexp.Regexp),
versionCache: make(map[string]scheduler.VerConstraints),
semverCache: make(map[string]scheduler.VerConstraints),
}
checker = scheduler.NewConstraintChecker(ctx, vol.Constraints)
}

for {
raw := iter.Next()
if raw == nil {
break
}
candidate := raw.(*structs.Node)
if _, hasVol := candidate.HostVolumes[vol.Name]; hasVol {
continue
}

if checker != nil {
if ok := checker.Feasible(candidate); !ok {
continue
}
}

return candidate, nil
}

return nil, fmt.Errorf("no node meets constraints")
}

// placementContext implements the scheduler.ConstraintContext interface, a
// minimal subset of the scheduler.Context interface that we need to create a
// feasibility checker for constraints
type placementContext struct {
regexpCache map[string]*regexp.Regexp
versionCache map[string]scheduler.VerConstraints
semverCache map[string]scheduler.VerConstraints
}

func (ctx *placementContext) Metrics() *structs.AllocMetric { return &structs.AllocMetric{} }
func (ctx *placementContext) RegexpCache() map[string]*regexp.Regexp { return ctx.regexpCache }

func (ctx *placementContext) VersionConstraintCache() map[string]scheduler.VerConstraints {
return ctx.versionCache
}

func (ctx *placementContext) SemverConstraintCache() map[string]scheduler.VerConstraints {
return ctx.semverCache
}

func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *structs.HostVolumeDeleteResponse) error {

authErr := v.srv.Authenticate(v.ctx, args)
Expand Down
118 changes: 118 additions & 0 deletions nomad/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
Expand Down Expand Up @@ -156,6 +157,25 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
must.EqError(t, err, "Permission denied")
})

t.Run("invalid node constraints", func(t *testing.T) {
req.Volumes[0].Constraints[0].RTarget = "r2"
req.Volumes[1].Constraints[0].RTarget = "r2"

defer func() {
req.Volumes[0].Constraints[0].RTarget = "r1"
req.Volumes[1].Constraints[0].RTarget = "r1"
}()

var resp structs.HostVolumeCreateResponse
req.AuthToken = token
err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp)
must.EqError(t, err, `2 errors occurred:
* could not place volume "example1": no node meets constraints
* could not place volume "example2": no node meets constraints
`)
})

t.Run("valid create", func(t *testing.T) {
var resp structs.HostVolumeCreateResponse
req.AuthToken = token
Expand Down Expand Up @@ -611,6 +631,103 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
})
}

func TestHostVolumeEndpoint_placeVolume(t *testing.T) {
srv, _, cleanupSrv := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0
})
t.Cleanup(cleanupSrv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.fsm.State()

endpoint := &HostVolume{
srv: srv,
logger: testlog.HCLogger(t),
}

node0, node1, node2, node3 := mock.Node(), mock.Node(), mock.Node(), mock.Node()
node0.NodePool = structs.NodePoolDefault
node1.NodePool = "dev"
node1.Meta["rack"] = "r2"
node2.NodePool = "prod"
node3.NodePool = "prod"
node3.Meta["rack"] = "r3"
node3.HostVolumes = map[string]*structs.ClientHostVolumeConfig{"example": {
Name: "example",
Path: "/srv",
}}

must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node0))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node1))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node2))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node3))

testCases := []struct {
name string
vol *structs.HostVolume
expect *structs.Node
expectErr string
}{
{
name: "only one in node pool",
vol: &structs.HostVolume{NodePool: "default"},
expect: node0,
},
{
name: "only one that matches constraints",
vol: &structs.HostVolume{Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r2",
Operand: "=",
},
}},
expect: node1,
},
{
name: "only one available in pool",
vol: &structs.HostVolume{NodePool: "prod", Name: "example"},
expect: node2,
},
{
name: "no match",
vol: &structs.HostVolume{Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r6",
Operand: "=",
},
}},
expectErr: "no node meets constraints",
},
{
name: "match is not available for placement",
vol: &structs.HostVolume{
Name: "example",
Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r3",
Operand: "=",
},
}},
expectErr: "no node meets constraints",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
node, err := endpoint.placeHostVolume(tc.vol)
if tc.expectErr == "" {
must.NoError(t, err)
must.Eq(t, tc.expect, node)
} else {
must.EqError(t, err, tc.expectErr)
must.Nil(t, node)
}
})
}
}

// mockHostVolumeClient models client RPCs that have side-effects on the
// client host
type mockHostVolumeClient struct {
Expand All @@ -631,6 +748,7 @@ func newMockHostVolumeClient(t *testing.T, srv *Server, pool string) (*mockHostV
c.Node.NodePool = pool
// TODO(1.10.0): we'll want to have a version gate for this feature
c.Node.Attributes["nomad.version"] = version.Version
c.Node.Meta["rack"] = "r1"
}, srv.config.RPCAddr, map[string]any{"HostVolume": mockClientEndpoint})
t.Cleanup(cleanup)

Expand Down
7 changes: 7 additions & 0 deletions scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type Context interface {
SendEvent(event interface{})
}

type ConstraintContext interface {
Metrics() *structs.AllocMetric
RegexpCache() map[string]*regexp.Regexp
VersionConstraintCache() map[string]VerConstraints
SemverConstraintCache() map[string]VerConstraints
}

// EvalCache is used to cache certain things during an evaluation
type EvalCache struct {
reCache map[string]*regexp.Regexp
Expand Down
Loading

0 comments on commit 822d291

Please sign in to comment.