CSI: use job status not alloc status for plugin updates from summary (#12027)

When an allocation is updated, the job summary for the associated job
is also updated. CSI uses the job summary to set the expected count
for controller and node plugins. We incorrectly used the allocation's
server status instead of the job status when deciding whether to
update or remove the job from the plugins. This caused a node drain or
other terminal state for an allocation to clear the expected count for
the entire plugin.

Use the job status to guide whether to update or remove the expected
count.

The existing CSI tests for the state store incorrectly modeled the
updates we received from servers vs those we received from clients,
leading to test assertions that passed when they should not.

Rework the tests to clarify each step in the lifecycle and rename CSI state
store functions for clarity
This commit is contained in:
Tim Gross 2022-02-09 11:51:49 -05:00 committed by GitHub
parent 59c8558969
commit 6bd33d3fb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 389 additions and 630 deletions

View File

@ -2178,6 +2178,39 @@ func CSIVolume(plugin *structs.CSIPlugin) *structs.CSIVolume {
}
}
func CSIPluginJob(pluginType structs.CSIPluginType, pluginID string) *structs.Job {
job := new(structs.Job)
switch pluginType {
case structs.CSIPluginTypeController:
job = Job()
job.ID = fmt.Sprintf("mock-controller-%s", pluginID)
job.Name = "job-plugin-controller"
job.TaskGroups[0].Count = 2
case structs.CSIPluginTypeNode:
job = SystemJob()
job.ID = fmt.Sprintf("mock-node-%s", pluginID)
job.Name = "job-plugin-node"
case structs.CSIPluginTypeMonolith:
job = SystemJob()
job.ID = fmt.Sprintf("mock-monolith-%s", pluginID)
job.Name = "job-plugin-monolith"
}
job.TaskGroups[0].Name = "plugin"
job.TaskGroups[0].Tasks[0].Name = "plugin"
job.TaskGroups[0].Tasks[0].Driver = "docker"
job.TaskGroups[0].Tasks[0].Services = nil
job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{
ID: pluginID,
Type: pluginType,
MountDir: "/csi",
}
job.Canonicalize()
return job
}
func Events(index uint64) *structs.Events {
return &structs.Events{
Index: index,

View File

@ -853,7 +853,7 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if err := upsertNodeCSIPlugins(txn, node, index); err != nil {
if err := upsertCSIPluginsForNode(txn, node, index); err != nil {
return fmt.Errorf("csi plugin update failed: %v", err)
}
@ -1178,11 +1178,11 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv
}
}
// upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called
// upsertCSIPluginsForNode indexes csi plugins for volume retrieval, with health. It's called
// on upsertNodeEvents, so that event driven health changes are updated
func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error {
loop := func(info *structs.CSIInfo) error {
upsertFn := func(info *structs.CSIInfo) error {
raw, err := txn.First("csi_plugins", "id", info.PluginID)
if err != nil {
return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err)
@ -1226,7 +1226,7 @@ func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
inUseNode := map[string]struct{}{}
for _, info := range node.CSIControllerPlugins {
err := loop(info)
err := upsertFn(info)
if err != nil {
return err
}
@ -1234,7 +1234,7 @@ func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
}
for _, info := range node.CSINodePlugins {
err := loop(info)
err := upsertFn(info)
if err != nil {
return err
}
@ -3228,7 +3228,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
return err
}
if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil {
if err := s.updatePluginForTerminalAlloc(index, copyAlloc, txn); err != nil {
return err
}
@ -3337,7 +3337,7 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return err
}
if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil {
if err := s.updatePluginForTerminalAlloc(index, alloc, txn); err != nil {
return err
}
@ -4782,7 +4782,7 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *txn) error {
plugIns := make(map[string]*structs.CSIPlugin)
loop := func(job *structs.Job, delete bool) error {
upsertFn := func(job *structs.Job, delete bool) error {
for _, tg := range job.TaskGroups {
for _, t := range tg.Tasks {
if t.CSIPluginConfig == nil {
@ -4816,13 +4816,13 @@ func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, t
}
if prev != nil {
err := loop(prev, true)
err := upsertFn(prev, true)
if err != nil {
return err
}
}
err := loop(job, false)
err := upsertFn(job, false)
if err != nil {
return err
}
@ -5067,10 +5067,11 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}
// updatePluginWithAlloc updates the CSI plugins for an alloc when the
// updatePluginForTerminalAlloc updates the CSI plugins for an alloc when the
// allocation is updated or inserted with a terminal server status.
func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
func (s *StateStore) updatePluginForTerminalAlloc(index uint64, alloc *structs.Allocation,
txn *txn) error {
if !alloc.ServerTerminalStatus() {
return nil
}
@ -5126,7 +5127,9 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J
plug = plug.Copy()
}
plug.UpdateExpectedWithJob(alloc.Job, summary, alloc.ServerTerminalStatus())
plug.UpdateExpectedWithJob(alloc.Job, summary,
alloc.Job.Status == structs.JobStatusDead)
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err

File diff suppressed because it is too large Load Diff