open-nomad/nomad/volumewatcher/interfaces_test.go
Tim Gross a7a64443e1
csi: move volume claim release into volumewatcher (#7794)
This changeset adds a subsystem to run on the leader, similar to the
deployment watcher or node drainer. The `Watcher` performs a blocking
query on updates to the `CSIVolumes` table and triggers reaping of
volume claims.

This will avoid tying up scheduling workers by immediately sending
volume claim workloads into their own loop, rather than blocking the
scheduling workers in the core GC job doing things like talking to CSI
controllers

The volume watcher is enabled on leader step-up and disabled on leader
step-down.

The volume claim GC mechanism now makes an empty claim RPC for the
volume to trigger an index bump. That in turn unblocks the blocking
query in the volume watcher so it can assess which claims can be
released for a volume.
2020-04-30 09:13:00 -04:00

149 lines
4.5 KiB
Go

package volumewatcher
import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// Create a client node with plugin info
func testNode(node *structs.Node, plugin *structs.CSIPlugin, s *state.StateStore) *structs.Node {
if node != nil {
return node
}
node = mock.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version
node.CSINodePlugins = map[string]*structs.CSIInfo{
plugin.ID: {
PluginID: plugin.ID,
Healthy: true,
RequiresControllerPlugin: plugin.ControllerRequired,
NodeInfo: &structs.CSINodeInfo{},
},
}
if plugin.ControllerRequired {
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugin.ID: {
PluginID: plugin.ID,
Healthy: true,
RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsAttachDetach: true,
SupportsListVolumes: true,
SupportsListVolumesAttachedNodes: false,
},
},
}
} else {
node.CSIControllerPlugins = map[string]*structs.CSIInfo{}
}
s.UpsertNode(99, node)
return node
}
// Create a test volume with claim info
func testVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin, alloc *structs.Allocation, nodeID string) *structs.CSIVolume {
if vol != nil {
return vol
}
vol = mock.CSIVolume(plugin)
vol.ControllerRequired = plugin.ControllerRequired
vol.ReadAllocs = map[string]*structs.Allocation{alloc.ID: alloc}
vol.ReadClaims = map[string]*structs.CSIVolumeClaim{
alloc.ID: {
AllocationID: alloc.ID,
NodeID: nodeID,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateTaken,
},
}
return vol
}
// COMPAT(1.0): the claim fields were added after 0.11.1; this
// mock and the associated test cases can be removed for 1.0
func testOldVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin, alloc *structs.Allocation, nodeID string) *structs.CSIVolume {
if vol != nil {
return vol
}
vol = mock.CSIVolume(plugin)
vol.ControllerRequired = plugin.ControllerRequired
vol.ReadAllocs = map[string]*structs.Allocation{alloc.ID: alloc}
return vol
}
type MockRPCServer struct {
state *state.StateStore
// mock responses for ClientCSI.NodeDetachVolume
nextCSINodeDetachResponse *cstructs.ClientCSINodeDetachVolumeResponse
nextCSINodeDetachError error
countCSINodeDetachVolume int
// mock responses for ClientCSI.ControllerDetachVolume
nextCSIControllerDetachVolumeResponse *cstructs.ClientCSIControllerDetachVolumeResponse
nextCSIControllerDetachError error
countCSIControllerDetachVolume int
countUpdateClaims int
countUpsertVolumeClaims int
}
func (srv *MockRPCServer) ControllerDetachVolume(args *cstructs.ClientCSIControllerDetachVolumeRequest, reply *cstructs.ClientCSIControllerDetachVolumeResponse) error {
reply = srv.nextCSIControllerDetachVolumeResponse
srv.countCSIControllerDetachVolume++
return srv.nextCSIControllerDetachError
}
func (srv *MockRPCServer) NodeDetachVolume(args *cstructs.ClientCSINodeDetachVolumeRequest, reply *cstructs.ClientCSINodeDetachVolumeResponse) error {
reply = srv.nextCSINodeDetachResponse
srv.countCSINodeDetachVolume++
return srv.nextCSINodeDetachError
}
func (srv *MockRPCServer) UpsertVolumeClaims(*structs.CSIVolumeClaimBatchRequest) (uint64, error) {
srv.countUpsertVolumeClaims++
return 0, nil
}
func (srv *MockRPCServer) State() *state.StateStore { return srv.state }
func (srv *MockRPCServer) UpdateClaims(claims []structs.CSIVolumeClaimRequest) (uint64, error) {
srv.countUpdateClaims++
return 0, nil
}
type MockBatchingRPCServer struct {
MockRPCServer
volumeUpdateBatcher *VolumeUpdateBatcher
}
func (srv *MockBatchingRPCServer) UpdateClaims(claims []structs.CSIVolumeClaimRequest) (uint64, error) {
srv.countUpdateClaims++
return srv.volumeUpdateBatcher.CreateUpdate(claims).Results()
}
type MockStatefulRPCServer struct {
MockRPCServer
volumeUpdateBatcher *VolumeUpdateBatcher
}
func (srv *MockStatefulRPCServer) UpsertVolumeClaims(batch *structs.CSIVolumeClaimBatchRequest) (uint64, error) {
srv.countUpsertVolumeClaims++
index, _ := srv.state.LatestIndex()
for _, req := range batch.Claims {
index++
err := srv.state.CSIVolumeClaim(index, req.RequestNamespace(),
req.VolumeID, req.ToClaim())
if err != nil {
return 0, err
}
}
return index, nil
}