Skip to content

Commit

Permalink
dynamic host volumes: node selection via constraints
Browse files Browse the repository at this point in the history
When making a request to create a dynamic host volumes, users can pass a node
pool and constraints instead of a specific node ID.

This changeset implements a node scheduling logic by instantiating a filter by
node pool and constraint checker borrowed from the scheduler package. Because
host volumes with the same name can't land on the same host, we don't need to
support `distinct_hosts`/`distinct_property`; this would be challenging anyways
without building out a much larger node iteration mechanism to keep track of
usage across multiple hosts.

Ref: #24479
  • Loading branch information
tgross committed Nov 20, 2024
1 parent 23c19e5 commit 11d281e
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 46 deletions.
1 change: 1 addition & 0 deletions command/agent/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) {
// Create a volume on the test node

vol := mock.HostVolumeRequest(structs.DefaultNamespace)
vol.Constraints = nil
reqBody := struct {
Volumes []*structs.HostVolume
}{Volumes: []*structs.HostVolume{vol}}
Expand Down
10 changes: 4 additions & 6 deletions command/volume_create_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"github.com/hashicorp/hcl"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent"
"github.com/mitchellh/cli"
"github.com/shoenig/test/must"
)

func TestHostVolumeCreateCommand_Run(t *testing.T) {
ci.Parallel(t)
srv, client, url := testServer(t, true, nil)
srv, client, url := testServer(t, true, func(c *agent.Config) {
c.Client.Meta = map[string]string{"rack": "foo"}
})
t.Cleanup(srv.Shutdown)

waitForNodes(t, client)
Expand All @@ -38,11 +41,6 @@ node_pool = "default"
capacity_min = "10GiB"
capacity_max = "20G"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
constraint {
attribute = "${meta.rack}"
value = "foo"
Expand Down
103 changes: 81 additions & 22 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package nomad
import (
"fmt"
"net/http"
"regexp"
"strings"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)

// HostVolume is the server RPC endpoint for host volumes
Expand Down Expand Up @@ -425,28 +427,11 @@ func (v *HostVolume) validateVolumeForState(vol *structs.HostVolume, snap *state

func (v *HostVolume) createVolume(vol *structs.HostVolume) error {

// TODO(1.10.0): proper node selection based on constraints and node
// pool. Also, should we move this into the validator step?
if vol.NodeID == "" {
var iter memdb.ResultIterator
var err error
var raw any
if vol.NodePool != "" {
iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool)
} else {
iter, err = v.srv.State().Nodes(nil)
}
if err != nil {
return err
}
raw = iter.Next()
if raw == nil {
return fmt.Errorf("no node meets constraints for volume")
}

node := raw.(*structs.Node)
vol.NodeID = node.ID
node, err := v.placeHostVolume(vol)
if err != nil {
return fmt.Errorf("could not place volume %q: %w", vol.Name, err)
}
vol.NodeID = node.ID

method := "ClientHostVolume.Create"
cReq := &cstructs.ClientHostVolumeCreateRequest{
Expand All @@ -459,7 +444,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
Parameters: vol.Parameters,
}
cResp := &cstructs.ClientHostVolumeCreateResponse{}
err := v.srv.RPC(method, cReq, cResp)
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
Expand All @@ -474,6 +459,80 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
return nil
}

// placeHostVolume finds a node that matches the node pool and constraints,
// which doesn't already have a volume by that name. It returns a non-nil Node
// or an error indicating placement failed.
func (v *HostVolume) placeHostVolume(vol *structs.HostVolume) (*structs.Node, error) {

var iter memdb.ResultIterator
var err error
if vol.NodePool != "" {
iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool)
} else {
iter, err = v.srv.State().Nodes(nil)
}
if err != nil {
return nil, err
}

var checker *scheduler.ConstraintChecker

if len(vol.Constraints) > 0 {
ctx := &placementContext{
regexpCache: make(map[string]*regexp.Regexp),
versionCache: make(map[string]scheduler.VerConstraints),
semverCache: make(map[string]scheduler.VerConstraints),
}
checker = scheduler.NewConstraintChecker(ctx, vol.Constraints)
}

for {
raw := iter.Next()
if raw == nil {
break
}
candidate := raw.(*structs.Node)

// note: this is a race if multiple users create volumes of the same
// name concurrently, but we can't solve it on the server because we
// haven't yet written to state. The client will reject requests to
// create/register a volume with the same name with a different ID.
if _, hasVol := candidate.HostVolumes[vol.Name]; hasVol {
continue
}

if checker != nil {
if ok := checker.Feasible(candidate); !ok {
continue
}
}

return candidate, nil
}

return nil, fmt.Errorf("no node meets constraints")
}

// placementContext implements the scheduler.ConstraintContext interface, a
// minimal subset of the scheduler.Context interface that we need to create a
// feasibility checker for constraints
type placementContext struct {
regexpCache map[string]*regexp.Regexp
versionCache map[string]scheduler.VerConstraints
semverCache map[string]scheduler.VerConstraints
}

func (ctx *placementContext) Metrics() *structs.AllocMetric { return &structs.AllocMetric{} }
func (ctx *placementContext) RegexpCache() map[string]*regexp.Regexp { return ctx.regexpCache }

func (ctx *placementContext) VersionConstraintCache() map[string]scheduler.VerConstraints {
return ctx.versionCache
}

func (ctx *placementContext) SemverConstraintCache() map[string]scheduler.VerConstraints {
return ctx.semverCache
}

func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *structs.HostVolumeDeleteResponse) error {

authErr := v.srv.Authenticate(v.ctx, args)
Expand Down
118 changes: 118 additions & 0 deletions nomad/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
Expand Down Expand Up @@ -156,6 +157,25 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
must.EqError(t, err, "Permission denied")
})

t.Run("invalid node constraints", func(t *testing.T) {
req.Volumes[0].Constraints[0].RTarget = "r2"
req.Volumes[1].Constraints[0].RTarget = "r2"

defer func() {
req.Volumes[0].Constraints[0].RTarget = "r1"
req.Volumes[1].Constraints[0].RTarget = "r1"
}()

var resp structs.HostVolumeCreateResponse
req.AuthToken = token
err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp)
must.EqError(t, err, `2 errors occurred:
* could not place volume "example1": no node meets constraints
* could not place volume "example2": no node meets constraints
`)
})

t.Run("valid create", func(t *testing.T) {
var resp structs.HostVolumeCreateResponse
req.AuthToken = token
Expand Down Expand Up @@ -611,6 +631,103 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
})
}

func TestHostVolumeEndpoint_placeVolume(t *testing.T) {
srv, _, cleanupSrv := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0
})
t.Cleanup(cleanupSrv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.fsm.State()

endpoint := &HostVolume{
srv: srv,
logger: testlog.HCLogger(t),
}

node0, node1, node2, node3 := mock.Node(), mock.Node(), mock.Node(), mock.Node()
node0.NodePool = structs.NodePoolDefault
node1.NodePool = "dev"
node1.Meta["rack"] = "r2"
node2.NodePool = "prod"
node3.NodePool = "prod"
node3.Meta["rack"] = "r3"
node3.HostVolumes = map[string]*structs.ClientHostVolumeConfig{"example": {
Name: "example",
Path: "/srv",
}}

must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node0))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node1))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node2))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node3))

testCases := []struct {
name string
vol *structs.HostVolume
expect *structs.Node
expectErr string
}{
{
name: "only one in node pool",
vol: &structs.HostVolume{NodePool: "default"},
expect: node0,
},
{
name: "only one that matches constraints",
vol: &structs.HostVolume{Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r2",
Operand: "=",
},
}},
expect: node1,
},
{
name: "only one available in pool",
vol: &structs.HostVolume{NodePool: "prod", Name: "example"},
expect: node2,
},
{
name: "no match",
vol: &structs.HostVolume{Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r6",
Operand: "=",
},
}},
expectErr: "no node meets constraints",
},
{
name: "match is not available for placement",
vol: &structs.HostVolume{
Name: "example",
Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r3",
Operand: "=",
},
}},
expectErr: "no node meets constraints",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
node, err := endpoint.placeHostVolume(tc.vol)
if tc.expectErr == "" {
must.NoError(t, err)
must.Eq(t, tc.expect, node)
} else {
must.EqError(t, err, tc.expectErr)
must.Nil(t, node)
}
})
}
}

// mockHostVolumeClient models client RPCs that have side-effects on the
// client host
type mockHostVolumeClient struct {
Expand All @@ -631,6 +748,7 @@ func newMockHostVolumeClient(t *testing.T, srv *Server, pool string) (*mockHostV
c.Node.NodePool = pool
// TODO(1.10.0): we'll want to have a version gate for this feature
c.Node.Attributes["nomad.version"] = version.Version
c.Node.Meta["rack"] = "r1"
}, srv.config.RPCAddr, map[string]any{"HostVolume": mockClientEndpoint})
t.Cleanup(cleanup)

Expand Down
6 changes: 6 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (hv *HostVolume) Validate() error {
if err := constraint.Validate(); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("invalid constraint: %v", err))
}
switch constraint.Operand {
case ConstraintDistinctHosts, ConstraintDistinctProperty:
mErr = multierror.Append(mErr, fmt.Errorf(
"invalid constraint %s: host volumes of the same name are always on distinct hosts", constraint.Operand))
default:
}
}

return mErr.ErrorOrNil()
Expand Down
7 changes: 7 additions & 0 deletions scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type Context interface {
SendEvent(event interface{})
}

type ConstraintContext interface {
Metrics() *structs.AllocMetric
RegexpCache() map[string]*regexp.Regexp
VersionConstraintCache() map[string]VerConstraints
SemverConstraintCache() map[string]VerConstraints
}

// EvalCache is used to cache certain things during an evaluation
type EvalCache struct {
reCache map[string]*regexp.Regexp
Expand Down
Loading

0 comments on commit 11d281e

Please sign in to comment.