csi: use node MaxVolumes during scheduling (#7565)
* nomad/state/state_store: CSIVolumesByNodeID ignores namespace * scheduler/scheduler: add CSIVolumesByNodeID to the state interface * scheduler/feasible: check node MaxVolumes * nomad/csi_endpoint: no namespace inn CSIVolumesByNodeID anymore * nomad/state/state_store: avoid DenormalizeAllocationSlice * nomad/state/iterator: clean up SliceIterator Next * scheduler/feasible_test: block with MaxVolumes * nomad/state/state_store_test: fix args to CSIVolumesByNodeID
This commit is contained in:
parent
8d4f39fba1
commit
e03c328792
|
@ -121,7 +121,7 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
|
|||
var iter memdb.ResultIterator
|
||||
|
||||
if args.NodeID != "" {
|
||||
iter, err = state.CSIVolumesByNodeID(ws, ns, args.NodeID)
|
||||
iter, err = state.CSIVolumesByNodeID(ws, args.NodeID)
|
||||
} else if args.PluginID != "" {
|
||||
iter, err = state.CSIVolumesByPluginID(ws, ns, args.PluginID)
|
||||
} else {
|
||||
|
@ -147,11 +147,16 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
|
|||
return err
|
||||
}
|
||||
|
||||
// Filter (possibly again) on PluginID to handle passing both NodeID and PluginID
|
||||
// Remove (possibly again) by PluginID to handle passing both NodeID and PluginID
|
||||
if args.PluginID != "" && args.PluginID != vol.PluginID {
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove by Namespace, since CSIVolumesByNodeID hasn't used the Namespace yet
|
||||
if vol.Namespace != ns {
|
||||
continue
|
||||
}
|
||||
|
||||
vs = append(vs, vol.Stub())
|
||||
}
|
||||
reply.Volumes = vs
|
||||
|
|
|
@ -20,9 +20,10 @@ func (i *SliceIterator) Next() interface{} {
|
|||
if i.idx == len(i.data) {
|
||||
return nil
|
||||
}
|
||||
idx := i.idx
|
||||
|
||||
datum := i.data[i.idx]
|
||||
i.idx += 1
|
||||
return i.data[idx]
|
||||
return datum
|
||||
}
|
||||
|
||||
func (i *SliceIterator) WatchCh() <-chan struct{} {
|
||||
|
|
|
@ -1868,23 +1868,14 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID
|
|||
}
|
||||
|
||||
// CSIVolumesByNodeID looks up CSIVolumes in use on a node
|
||||
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, namespace, nodeID string) (memdb.ResultIterator, error) {
|
||||
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) {
|
||||
allocs, err := s.AllocsByNode(ws, nodeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
snap, err := s.Snapshot()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
|
||||
allocs, err = snap.DenormalizeAllocationSlice(allocs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
|
||||
// Find volume ids for CSI volumes in running allocs, or allocs that we desire to run
|
||||
ids := map[string]struct{}{}
|
||||
ids := map[string]string{} // Map volumeID to Namespace
|
||||
for _, a := range allocs {
|
||||
tg := a.Job.LookupTaskGroup(a.TaskGroup)
|
||||
|
||||
|
@ -1898,14 +1889,14 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, namespace, nodeID str
|
|||
if v.Type != structs.VolumeTypeCSI {
|
||||
continue
|
||||
}
|
||||
ids[v.Source] = struct{}{}
|
||||
ids[v.Source] = a.Namespace
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup the raw CSIVolumes to match the other list interfaces
|
||||
iter := NewSliceIterator()
|
||||
txn := s.db.Txn(false)
|
||||
for id := range ids {
|
||||
for id, namespace := range ids {
|
||||
raw, err := txn.First("csi_volumes", "id", namespace, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
|
||||
|
@ -5067,6 +5058,8 @@ func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*
|
|||
// DenormalizeAllocationSlice queries the Allocation for each allocation diff
|
||||
// represented as an Allocation and merges the updated attributes with the existing
|
||||
// Allocation, and attaches the Job provided.
|
||||
//
|
||||
// This should only be called on terminal allocs, particularly stopped or preempted allocs
|
||||
func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation) ([]*structs.Allocation, error) {
|
||||
allocDiffs := make([]*structs.AllocationDiff, len(allocs))
|
||||
for i, alloc := range allocs {
|
||||
|
|
|
@ -2936,7 +2936,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
|||
require.Equal(t, 1, len(vs))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
iter, err = state.CSIVolumesByNodeID(ws, ns, node.ID)
|
||||
iter, err = state.CSIVolumesByNodeID(ws, node.ID)
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.Equal(t, 1, len(vs))
|
||||
|
|
|
@ -15,16 +15,17 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
FilterConstraintHostVolumes = "missing compatible host volumes"
|
||||
FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s"
|
||||
FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s"
|
||||
FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed"
|
||||
FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s"
|
||||
FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims"
|
||||
FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only"
|
||||
FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" //
|
||||
FilterConstraintDrivers = "missing drivers"
|
||||
FilterConstraintDevices = "missing devices"
|
||||
FilterConstraintHostVolumes = "missing compatible host volumes"
|
||||
FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s"
|
||||
FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s"
|
||||
FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s"
|
||||
FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed"
|
||||
FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s"
|
||||
FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims"
|
||||
FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only"
|
||||
FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" //
|
||||
FilterConstraintDrivers = "missing drivers"
|
||||
FilterConstraintDevices = "missing devices"
|
||||
)
|
||||
|
||||
// FeasibleIterator is used to iteratively yield nodes that
|
||||
|
@ -247,6 +248,26 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
|
|||
}
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
// Find the count per plugin for this node, so that can enforce MaxVolumes
|
||||
pluginCount := map[string]int64{}
|
||||
iter, err := c.ctx.State().CSIVolumesByNodeID(ws, n.ID)
|
||||
if err != nil {
|
||||
return false, FilterConstraintCSIVolumesLookupFailed
|
||||
}
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
vol, ok := raw.(*structs.CSIVolume)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
pluginCount[vol.PluginID] += 1
|
||||
}
|
||||
|
||||
// For volume requests, find volumes and determine feasibility
|
||||
for _, req := range c.volumes {
|
||||
vol, err := c.ctx.State().CSIVolumeByID(ws, c.namespace, req.Source)
|
||||
if err != nil {
|
||||
|
@ -264,6 +285,9 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
|
|||
if !plugin.Healthy {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIPluginUnhealthyTemplate, vol.PluginID, n.ID)
|
||||
}
|
||||
if pluginCount[vol.PluginID] >= plugin.NodeInfo.MaxVolumes {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIPluginMaxVolumesTemplate, vol.PluginID, n.ID)
|
||||
}
|
||||
|
||||
if req.ReadOnly {
|
||||
if !vol.ReadSchedulable() {
|
||||
|
|
|
@ -239,6 +239,7 @@ func TestCSIVolumeChecker(t *testing.T) {
|
|||
mock.Node(),
|
||||
mock.Node(),
|
||||
mock.Node(),
|
||||
mock.Node(),
|
||||
}
|
||||
|
||||
// Register running plugins on some nodes
|
||||
|
@ -253,21 +254,28 @@ func TestCSIVolumeChecker(t *testing.T) {
|
|||
"foo": {
|
||||
PluginID: "foo",
|
||||
Healthy: true,
|
||||
NodeInfo: &structs.CSINodeInfo{},
|
||||
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
|
||||
},
|
||||
}
|
||||
nodes[1].CSINodePlugins = map[string]*structs.CSIInfo{
|
||||
"foo": {
|
||||
PluginID: "foo",
|
||||
Healthy: false,
|
||||
NodeInfo: &structs.CSINodeInfo{},
|
||||
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
|
||||
},
|
||||
}
|
||||
nodes[2].CSINodePlugins = map[string]*structs.CSIInfo{
|
||||
"bar": {
|
||||
PluginID: "bar",
|
||||
Healthy: true,
|
||||
NodeInfo: &structs.CSINodeInfo{},
|
||||
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
|
||||
},
|
||||
}
|
||||
nodes[4].CSINodePlugins = map[string]*structs.CSIInfo{
|
||||
"foo": {
|
||||
PluginID: "foo",
|
||||
Healthy: true,
|
||||
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -288,6 +296,37 @@ func TestCSIVolumeChecker(t *testing.T) {
|
|||
vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
|
||||
err := state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
|
||||
require.NoError(t, err)
|
||||
index++
|
||||
|
||||
// Create some other volumes in use on nodes[3] to trip MaxVolumes
|
||||
vid2 := uuid.Generate()
|
||||
vol2 := structs.NewCSIVolume(vid2, index)
|
||||
vol2.PluginID = "foo"
|
||||
vol2.Namespace = structs.DefaultNamespace
|
||||
vol2.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
|
||||
vol2.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
|
||||
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol2})
|
||||
require.NoError(t, err)
|
||||
index++
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.NodeID = nodes[4].ID
|
||||
alloc.Job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
|
||||
vid2: {
|
||||
Name: vid2,
|
||||
Type: "csi",
|
||||
Source: vid2,
|
||||
},
|
||||
}
|
||||
err = state.UpsertJob(index, alloc.Job)
|
||||
require.NoError(t, err)
|
||||
index++
|
||||
summary := mock.JobSummary(alloc.JobID)
|
||||
require.NoError(t, state.UpsertJobSummary(index, summary))
|
||||
index++
|
||||
err = state.UpsertAllocs(index, []*structs.Allocation{alloc})
|
||||
require.NoError(t, err)
|
||||
index++
|
||||
|
||||
// Create volume requests
|
||||
noVolumes := map[string]*structs.VolumeRequest{}
|
||||
|
@ -343,6 +382,11 @@ func TestCSIVolumeChecker(t *testing.T) {
|
|||
RequestedVolumes: volumes,
|
||||
Result: false,
|
||||
},
|
||||
{ // Volumes requested, MaxVolumes exceeded
|
||||
Node: nodes[4],
|
||||
RequestedVolumes: volumes,
|
||||
Result: false,
|
||||
},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
|
|
|
@ -97,6 +97,9 @@ type State interface {
|
|||
|
||||
// CSIVolumeByID fetch CSI volumes, containing controller jobs
|
||||
CSIVolumeByID(memdb.WatchSet, string, string) (*structs.CSIVolume, error)
|
||||
|
||||
// CSIVolumeByID fetch CSI volumes, containing controller jobs
|
||||
CSIVolumesByNodeID(memdb.WatchSet, string) (memdb.ResultIterator, error)
|
||||
}
|
||||
|
||||
// Planner interface is used to submit a task allocation plan.
|
||||
|
|
Loading…
Reference in New Issue