csi: fix plugin counts on node update (#7844)

In this changeset:

* If a Nomad client node is running both a controller and a node
  plugin (which is a common case), then if only the controller or the
  node is removed, the plugin was not being updated with the correct
  counts.
* The existing test for plugin cleanup didn't go back to the state
  store, which normally is ok but is complicated in this case by
  denormalization which changes the behavior. This commit makes the
  test more comprehensive.
* Set "controller required" when plugin has `PUBLISH_READONLY`. All
  known controllers that support `PUBLISH_READONLY` also support
  `PUBLISH_UNPUBLISH_VOLUME` but we shouldn't assume this.
* Only create plugins when the allocs for those plugins are
  healthy. If we allow a plugin to be created for the first time when
  the alloc is not healthy, then we'll recreate deleted plugins when
  the job's allocs all get marked terminal.
* Terminal plugin alloc updates should cleanup the plugin. The client
  fingerprint can't tell if the plugin is unhealthy intentionally (for
  the case of updates or job stop). Allocations that are
  server-terminal should delete themselves from the plugin and trigger
  a plugin self-GC, the same as an unused node.
This commit is contained in:
Tim Gross 2020-05-05 15:39:57 -04:00 committed by GitHub
parent 3cca738478
commit ce86a594a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 499 additions and 65 deletions

View File

@ -560,10 +560,12 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {
// Create a client node with a plugin
node := mock.Node()
node.CSINodePlugins = map[string]*structs.CSIInfo{
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie", Healthy: true, RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true},
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"adam": {PluginID: "adam", Healthy: true},
}
err := state.UpsertNode(3, node)

View File

@ -1037,6 +1037,12 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
if raw != nil {
plug = raw.(*structs.CSIPlugin).Copy()
} else {
if !info.Healthy {
// we don't want to create new plugins for unhealthy
// allocs, otherwise we'd recreate the plugin when we
// get the update for the alloc becoming terminal
return nil
}
plug = structs.NewCSIPlugin(info.PluginID, index)
plug.Provider = info.Provider
plug.Version = info.ProviderVersion
@ -1057,13 +1063,15 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return nil
}
inUse := map[string]struct{}{}
inUseController := map[string]struct{}{}
inUseNode := map[string]struct{}{}
for _, info := range node.CSIControllerPlugins {
err := loop(info)
if err != nil {
return err
}
inUse[info.PluginID] = struct{}{}
inUseController[info.PluginID] = struct{}{}
}
for _, info := range node.CSINodePlugins {
@ -1071,7 +1079,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
if err != nil {
return err
}
inUse[info.PluginID] = struct{}{}
inUseNode[info.PluginID] = struct{}{}
}
// remove the client node from any plugin that's not
@ -1086,15 +1094,33 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
break
}
plug := raw.(*structs.CSIPlugin)
_, ok := inUse[plug.ID]
if !ok {
_, asController := plug.Controllers[node.ID]
_, asNode := plug.Nodes[node.ID]
if asController || asNode {
err = deleteNodeFromPlugin(txn, plug.Copy(), node, index)
var hadDelete bool
if _, ok := inUseController[plug.ID]; !ok {
if _, asController := plug.Controllers[node.ID]; asController {
err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController)
if err != nil {
return err
}
hadDelete = true
}
}
if _, ok := inUseNode[plug.ID]; !ok {
if _, asNode := plug.Nodes[node.ID]; asNode {
err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode)
if err != nil {
return err
}
hadDelete = true
}
}
// we check this flag both for performance and to make sure we
// don't delete a plugin when registering a node plugin but
// no controller
if hadDelete {
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}
@ -1130,7 +1156,11 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
}
plug := raw.(*structs.CSIPlugin).Copy()
err = deleteNodeFromPlugin(txn, plug, node, index)
err = plug.DeleteNode(node.ID)
if err != nil {
return err
}
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
@ -1143,14 +1173,6 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return nil
}
func deleteNodeFromPlugin(txn *memdb.Txn, plug *structs.CSIPlugin, node *structs.Node, index uint64) error {
err := plug.DeleteNode(node.ID)
if err != nil {
return err
}
return updateOrGCPlugin(index, txn, plug)
}
// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error {
plug.ModifyIndex = index
@ -1187,7 +1209,6 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru
plugins := map[string]*structs.CSIPlugin{}
for _, a := range allocs {
// if its nil, we can just panic
tg := a.Job.LookupTaskGroup(a.TaskGroup)
for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
@ -2137,7 +2158,6 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
if vol == nil {
return nil, nil
}
// Lookup CSIPlugin, the health records, and calculate volume health
txn := s.db.Txn(false)
defer txn.Abort()
@ -2796,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
return err
}
if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil {
return err
}
// Update the allocation
if err := txn.Insert("allocs", copyAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
@ -2902,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return err
}
if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil {
return err
}
if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
@ -4559,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}
// updatePluginWithAlloc updates the CSI plugins for an alloc when the
// allocation is updated or inserted with a terminal server status.
func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
txn *memdb.Txn) error {
if !alloc.ServerTerminalStatus() {
return nil
}
ws := memdb.NewWatchSet()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
pluginID := t.CSIPluginConfig.ID
plug, err := s.CSIPluginByID(ws, pluginID)
if err != nil {
return err
}
if plug == nil {
// plugin may not have been created because it never
// became healthy, just move on
return nil
}
err = plug.DeleteAlloc(alloc.ID, alloc.NodeID)
if err != nil {
return err
}
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}
return nil
}
// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.Txn(true)

View File

@ -3027,10 +3027,11 @@ func TestStateStore_CSIVolume(t *testing.T) {
func TestStateStore_CSIPluginNodes(t *testing.T) {
index := uint64(999)
state := testStateStore(t)
ws := memdb.NewWatchSet()
plugID := "foo"
// Create Nodes fingerprinting the plugins
// Create Nomad client Nodes
ns := []*structs.Node{mock.Node(), mock.Node()}
for _, n := range ns {
index++
err := state.UpsertNode(index, n)
@ -3038,10 +3039,10 @@ func TestStateStore_CSIPluginNodes(t *testing.T) {
}
// Fingerprint a running controller plugin
n0 := ns[0].Copy()
n0, _ := state.NodeByID(ws, ns[0].ID)
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
plugID: {
PluginID: plugID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
@ -3052,17 +3053,16 @@ func TestStateStore_CSIPluginNodes(t *testing.T) {
},
},
}
index++
err := state.UpsertNode(index, n0)
require.NoError(t, err)
// Fingerprint two running node plugins
for _, n := range ns[:] {
n = n.Copy()
n, _ := state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
plugID: {
PluginID: plugID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
@ -3070,24 +3070,38 @@ func TestStateStore_CSIPluginNodes(t *testing.T) {
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
}
ws := memdb.NewWatchSet()
plug, err := state.CSIPluginByID(ws, "foo")
plug, err := state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.True(t, plug.ControllerRequired)
require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")
// Volume using the plugin
index++
vol := &structs.CSIVolume{
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
PluginID: plugID,
}
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
require.Equal(t, "foo", plug.ID)
require.Equal(t, 1, plug.ControllersHealthy)
require.Equal(t, 2, plug.NodesHealthy)
vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID)
require.NoError(t, err)
require.True(t, vol.Schedulable, "volume should be schedulable")
// Controller is unhealthy
n0, _ = state.NodeByID(ws, ns[0].ID)
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
plugID: {
PluginID: plugID,
Healthy: false,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
@ -3103,56 +3117,404 @@ func TestStateStore_CSIPluginNodes(t *testing.T) {
err = state.UpsertNode(index, n0)
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, "foo")
require.NoError(t, err)
require.Equal(t, "foo", plug.ID)
require.Equal(t, 0, plug.ControllersHealthy)
require.Equal(t, 2, plug.NodesHealthy)
// Volume using the plugin
index++
vol := &structs.CSIVolume{
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
PluginID: "foo",
}
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")
vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID)
require.NoError(t, err)
require.True(t, vol.Schedulable)
require.False(t, vol.Schedulable, "volume should not be schedulable")
// Node plugin is removed
n1 := ns[1].Copy()
n1, _ := state.NodeByID(ws, ns[1].ID)
n1.CSINodePlugins = map[string]*structs.CSIInfo{}
index++
err = state.UpsertNode(index, n1)
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, "foo")
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, "foo", plug.ID)
require.Equal(t, 0, plug.ControllersHealthy)
require.Equal(t, 1, plug.NodesHealthy)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 1, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 1, len(plug.Nodes), "nodes expected")
// Last plugin is removed
n0 = ns[0].Copy()
// Last node plugin is removed
n0, _ = state.NodeByID(ws, ns[0].ID)
n0.CSINodePlugins = map[string]*structs.CSIInfo{}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, "foo")
// Nodes plugins should be gone but controllers left
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 0, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 0, len(plug.Nodes), "nodes expected")
// A node plugin is restored
n0, _ = state.NodeByID(ws, n0.ID)
n0.CSINodePlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)
// Nodes plugin should be replaced and healthy
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 1, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 1, len(plug.Nodes), "nodes expected")
// Remove node again
n0, _ = state.NodeByID(ws, ns[0].ID)
n0.CSINodePlugins = map[string]*structs.CSIInfo{}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)
// Nodes plugins should be gone but controllers left
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 0, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 0, len(plug.Nodes), "nodes expected")
// controller is removed
n0, _ = state.NodeByID(ws, ns[0].ID)
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)
// Plugin has been removed entirely
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Nil(t, plug)
// Volume exists and is safe to query, but unschedulable
// Volume still exists and is safe to query, but unschedulable
vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID)
require.NoError(t, err)
require.False(t, vol.Schedulable)
}
// TestStateStore_CSIPluginAllocUpdates tests the ordering
// interactions for CSI plugins between Nomad client node updates and
// allocation updates.
func TestStateStore_CSIPluginAllocUpdates(t *testing.T) {
t.Parallel()
index := uint64(999)
state := testStateStore(t)
ws := memdb.NewWatchSet()
n := mock.Node()
index++
err := state.UpsertNode(index, n)
require.NoError(t, err)
// (1) unhealthy fingerprint, then terminal alloc, then healthy node update
plugID0 := "foo0"
alloc0 := mock.Alloc()
alloc0.NodeID = n.ID
alloc0.DesiredStatus = "run"
alloc0.ClientStatus = "running"
alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID0}
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc0})
require.NoError(t, err)
n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID0: {
PluginID: plugID0,
AllocID: alloc0.ID,
Healthy: false,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
plug, err := state.CSIPluginByID(ws, plugID0)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: not yet healthy")
alloc0.DesiredStatus = "stopped"
alloc0.ClientStatus = "complete"
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc0})
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID0)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: allocs never healthy")
n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins[plugID0].Healthy = true
n.CSINodePlugins[plugID0].UpdateTime = time.Now()
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID0)
require.NoError(t, err)
require.NotNil(t, plug, "plugin should exist")
// (2) healthy fingerprint, then terminal alloc update
plugID1 := "foo1"
alloc1 := mock.Alloc()
n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID1: {
PluginID: plugID1,
AllocID: alloc1.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID1)
require.NoError(t, err)
require.NotNil(t, plug, "plugin should exist")
alloc1.NodeID = n.ID
alloc1.DesiredStatus = "stop"
alloc1.ClientStatus = "complete"
alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID1}
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1})
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID1)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: alloc became terminal")
// (3) terminal alloc update, then unhealthy fingerprint
plugID2 := "foo2"
alloc2 := mock.Alloc()
alloc2.NodeID = n.ID
alloc2.DesiredStatus = "stop"
alloc2.ClientStatus = "complete"
alloc2.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID2}
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc2})
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID2)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: alloc became terminal")
n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID2: {
PluginID: plugID2,
AllocID: alloc2.ID,
Healthy: false,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID2)
require.NoError(t, err)
require.Nil(t, plug, "plugin should not exist: never became healthy")
}
// TestStateStore_CSIPluginMultiNodeUpdates tests the ordering
// interactions for CSI plugins between Nomad client node updates and
// allocation updates when multiple nodes are involved
func TestStateStore_CSIPluginMultiNodeUpdates(t *testing.T) {
t.Parallel()
index := uint64(999)
state := testStateStore(t)
ws := memdb.NewWatchSet()
var err error
// Create Nomad client Nodes
ns := []*structs.Node{mock.Node(), mock.Node()}
for _, n := range ns {
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
}
plugID := "foo"
plugCfg := &structs.TaskCSIPluginConfig{ID: plugID}
// Fingerprint two running node plugins and their allocs; we'll
// leave these in place for the test to ensure we don't GC the
// plugin
for _, n := range ns[:] {
nAlloc := mock.Alloc()
n, _ := state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: nAlloc.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
nAlloc.NodeID = n.ID
nAlloc.DesiredStatus = "run"
nAlloc.ClientStatus = "running"
nAlloc.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg
index++
err = state.UpsertAllocs(index, []*structs.Allocation{nAlloc})
require.NoError(t, err)
}
// Fingerprint a running controller plugin
alloc0 := mock.Alloc()
n0, _ := state.NodeByID(ws, ns[0].ID)
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: alloc0.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsListVolumes: true,
},
},
}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)
plug, err := state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")
n1, _ := state.NodeByID(ws, ns[1].ID)
alloc0.NodeID = n0.ID
alloc0.DesiredStatus = "stop"
alloc0.ClientStatus = "complete"
alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc0})
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 0, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")
alloc1 := mock.Alloc()
alloc1.NodeID = n1.ID
alloc1.DesiredStatus = "run"
alloc1.ClientStatus = "running"
alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1})
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 0, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")
n0, _ = state.NodeByID(ws, ns[0].ID)
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: alloc0.ID,
Healthy: false,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsListVolumes: true,
},
},
}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)
n1.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: alloc1.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsListVolumes: true,
},
},
}
index++
err = state.UpsertNode(index, n1)
require.NoError(t, err)
plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.True(t, plug.ControllerRequired)
require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")
}
func TestStateStore_CSIPluginJobs(t *testing.T) {
s := testStateStore(t)
deleteNodes := CreateTestCSIPlugin(s, "foo")

View File

@ -701,7 +701,8 @@ func (p *CSIPlugin) Copy() *CSIPlugin {
func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error {
if info.ControllerInfo != nil {
p.ControllerRequired = info.RequiresControllerPlugin &&
info.ControllerInfo.SupportsAttachDetach
(info.ControllerInfo.SupportsAttachDetach ||
info.ControllerInfo.SupportsReadOnlyAttach)
prev, ok := p.Controllers[nodeID]
if ok {
@ -712,11 +713,14 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error {
p.ControllersHealthy -= 1
}
}
// note: for this to work as expected, only a single
// controller for a given plugin can be on a given Nomad
// client, they also conflict on the client so this should be
// ok
p.Controllers[nodeID] = info
if prev != nil || info.Healthy {
p.Controllers[nodeID] = info
}
if info.Healthy {
p.ControllersHealthy += 1
}
@ -732,7 +736,9 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error {
p.NodesHealthy -= 1
}
}
p.Nodes[nodeID] = info
if prev != nil || info.Healthy {
p.Nodes[nodeID] = info
}
if info.Healthy {
p.NodesHealthy += 1
}