CSI: tests to exercise csi_hook (#11788)
Small refactoring of the allocrunner hook for CSI to make it more testable, and a unit test that covers most of its logic.
This commit is contained in:
parent
32f150d469
commit
5eda9be7b0
|
@ -163,7 +163,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
|||
}),
|
||||
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
|
||||
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
|
||||
newCSIHook(ar, hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs),
|
||||
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, hrs, ar.clientConfig.Node.SecretID),
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -16,15 +16,35 @@ import (
|
|||
//
|
||||
// It is a noop for allocs that do not depend on CSI Volumes.
|
||||
type csiHook struct {
|
||||
ar *allocRunner
|
||||
alloc *structs.Allocation
|
||||
logger hclog.Logger
|
||||
csimanager csimanager.Manager
|
||||
rpcClient RPCer
|
||||
updater hookResourceSetter
|
||||
alloc *structs.Allocation
|
||||
logger hclog.Logger
|
||||
csimanager csimanager.Manager
|
||||
rpcClient RPCer
|
||||
taskCapabilityGetter taskCapabilityGetter
|
||||
updater hookResourceSetter
|
||||
nodeSecret string
|
||||
|
||||
volumeRequests map[string]*volumeAndRequest
|
||||
}
|
||||
|
||||
// implemented by allocrunner
|
||||
type taskCapabilityGetter interface {
|
||||
GetTaskDriverCapabilities(string) (*drivers.Capabilities, error)
|
||||
}
|
||||
|
||||
func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook {
|
||||
return &csiHook{
|
||||
alloc: alloc,
|
||||
logger: logger.Named("csi_hook"),
|
||||
csimanager: csi,
|
||||
rpcClient: rpcClient,
|
||||
taskCapabilityGetter: taskCapabilityGetter,
|
||||
updater: updater,
|
||||
nodeSecret: nodeSecret,
|
||||
volumeRequests: map[string]*volumeAndRequest{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *csiHook) Name() string {
|
||||
return "csi_hook"
|
||||
}
|
||||
|
@ -109,7 +129,7 @@ func (c *csiHook) Postrun() error {
|
|||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.alloc.Job.Region,
|
||||
Namespace: c.alloc.Job.Namespace,
|
||||
AuthToken: c.ar.clientConfig.Node.SecretID,
|
||||
AuthToken: c.nodeSecret,
|
||||
},
|
||||
}
|
||||
err := c.rpcClient.RPC("CSIVolume.Unpublish",
|
||||
|
@ -142,7 +162,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
|
|||
if volumeRequest.Type == structs.VolumeTypeCSI {
|
||||
|
||||
for _, task := range tg.Tasks {
|
||||
caps, err := c.ar.GetTaskDriverCapabilities(task.Name)
|
||||
caps, err := c.taskCapabilityGetter.GetTaskDriverCapabilities(task.Name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not validate task driver capabilities: %v", err)
|
||||
}
|
||||
|
@ -180,13 +200,13 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
|
|||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.alloc.Job.Region,
|
||||
Namespace: c.alloc.Job.Namespace,
|
||||
AuthToken: c.ar.clientConfig.Node.SecretID,
|
||||
AuthToken: c.nodeSecret,
|
||||
},
|
||||
}
|
||||
|
||||
var resp structs.CSIVolumeClaimResponse
|
||||
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
|
||||
}
|
||||
|
||||
if resp.Volume == nil {
|
||||
|
@ -201,17 +221,6 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func newCSIHook(ar *allocRunner, logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook {
|
||||
return &csiHook{
|
||||
ar: ar,
|
||||
alloc: alloc,
|
||||
logger: logger.Named("csi_hook"),
|
||||
rpcClient: rpcClient,
|
||||
csimanager: csi,
|
||||
updater: updater,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *csiHook) shouldRun() bool {
|
||||
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
|
||||
for _, vol := range tg.Volumes {
|
||||
|
|
|
@ -0,0 +1,256 @@
|
|||
package allocrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
var _ interfaces.RunnerPrerunHook = (*csiHook)(nil)
|
||||
var _ interfaces.RunnerPostrunHook = (*csiHook)(nil)
|
||||
|
||||
// TODO https://github.com/hashicorp/nomad/issues/11786
|
||||
// we should implement Update as well
|
||||
// var _ interfaces.RunnerUpdateHook = (*csiHook)(nil)
|
||||
|
||||
func TestCSIHook(t *testing.T) {
|
||||
|
||||
alloc := mock.Alloc()
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
testcases := []struct {
|
||||
name string
|
||||
volumeRequests map[string]*structs.VolumeRequest
|
||||
expectedMounts map[string]*csimanager.MountInfo
|
||||
expectedMountCalls int
|
||||
expectedUnmountCalls int
|
||||
expectedClaimCalls int
|
||||
expectedUnpublishCalls int
|
||||
}{
|
||||
|
||||
{
|
||||
name: "simple case",
|
||||
volumeRequests: map[string]*structs.VolumeRequest{
|
||||
"vol0": {
|
||||
Name: "vol0",
|
||||
Type: structs.VolumeTypeCSI,
|
||||
Source: "testvolume0",
|
||||
ReadOnly: true,
|
||||
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
|
||||
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
||||
MountOptions: &structs.CSIMountOptions{},
|
||||
PerAlloc: false,
|
||||
},
|
||||
},
|
||||
expectedMounts: map[string]*csimanager.MountInfo{
|
||||
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
|
||||
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
|
||||
},
|
||||
expectedMountCalls: 1,
|
||||
expectedUnmountCalls: 0, // not until this is done client-side
|
||||
expectedClaimCalls: 1,
|
||||
expectedUnpublishCalls: 1,
|
||||
},
|
||||
|
||||
{
|
||||
name: "per-alloc case",
|
||||
volumeRequests: map[string]*structs.VolumeRequest{
|
||||
"vol0": {
|
||||
Name: "vol0",
|
||||
Type: structs.VolumeTypeCSI,
|
||||
Source: "testvolume0",
|
||||
ReadOnly: true,
|
||||
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
|
||||
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
||||
MountOptions: &structs.CSIMountOptions{},
|
||||
PerAlloc: true,
|
||||
},
|
||||
},
|
||||
expectedMounts: map[string]*csimanager.MountInfo{
|
||||
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
|
||||
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
|
||||
},
|
||||
expectedMountCalls: 1,
|
||||
expectedUnmountCalls: 0, // not until this is done client-side
|
||||
expectedClaimCalls: 1,
|
||||
expectedUnpublishCalls: 1,
|
||||
},
|
||||
|
||||
// TODO: this won't actually work on the client.
|
||||
// https://github.com/hashicorp/nomad/issues/11798
|
||||
//
|
||||
// {
|
||||
// name: "one source volume mounted read-only twice",
|
||||
// volumeRequests: map[string]*structs.VolumeRequest{
|
||||
// "vol0": {
|
||||
// Name: "vol0",
|
||||
// Type: structs.VolumeTypeCSI,
|
||||
// Source: "testvolume0",
|
||||
// ReadOnly: true,
|
||||
// AccessMode: structs.CSIVolumeAccessModeMultiNodeReader,
|
||||
// AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
||||
// MountOptions: &structs.CSIMountOptions{},
|
||||
// PerAlloc: false,
|
||||
// },
|
||||
// "vol1": {
|
||||
// Name: "vol1",
|
||||
// Type: structs.VolumeTypeCSI,
|
||||
// Source: "testvolume0",
|
||||
// ReadOnly: false,
|
||||
// AccessMode: structs.CSIVolumeAccessModeMultiNodeReader,
|
||||
// AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
||||
// MountOptions: &structs.CSIMountOptions{},
|
||||
// PerAlloc: false,
|
||||
// },
|
||||
// },
|
||||
// expectedMounts: map[string]*csimanager.MountInfo{
|
||||
// "vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
|
||||
// "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)},
|
||||
// "vol1": &csimanager.MountInfo{Source: fmt.Sprintf(
|
||||
// "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)},
|
||||
// },
|
||||
// expectedMountCalls: 1,
|
||||
// expectedUnmountCalls: 0, // not until this is done client-side
|
||||
// expectedClaimCalls: 1,
|
||||
// expectedUnpublishCalls: 1,
|
||||
// },
|
||||
}
|
||||
|
||||
for i := range testcases {
|
||||
tc := testcases[i]
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
alloc.Job.TaskGroups[0].Volumes = tc.volumeRequests
|
||||
|
||||
callCounts := map[string]int{}
|
||||
mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}}
|
||||
rpcer := mockRPCer{alloc: alloc, callCounts: callCounts}
|
||||
ar := mockAllocRunner{
|
||||
res: &cstructs.AllocHookResources{},
|
||||
caps: &drivers.Capabilities{
|
||||
FSIsolation: drivers.FSIsolationChroot,
|
||||
MountConfigs: drivers.MountConfigSupportAll,
|
||||
},
|
||||
}
|
||||
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
|
||||
require.NotNil(t, hook)
|
||||
|
||||
require.NoError(t, hook.Prerun())
|
||||
mounts := ar.GetAllocHookResources().GetCSIMounts()
|
||||
require.NotNil(t, mounts)
|
||||
require.Equal(t, tc.expectedMounts, mounts)
|
||||
|
||||
require.NoError(t, hook.Postrun())
|
||||
require.Equal(t, tc.expectedMountCalls, callCounts["mount"])
|
||||
require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"])
|
||||
require.Equal(t, tc.expectedClaimCalls, callCounts["claim"])
|
||||
require.Equal(t, tc.expectedUnpublishCalls, callCounts["unpublish"])
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// HELPERS AND MOCKS
|
||||
|
||||
func testVolume(id string) *structs.CSIVolume {
|
||||
vol := structs.NewCSIVolume(id, 0)
|
||||
vol.Schedulable = true
|
||||
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
|
||||
{
|
||||
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
||||
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
|
||||
},
|
||||
{
|
||||
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
||||
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
|
||||
},
|
||||
}
|
||||
return vol
|
||||
}
|
||||
|
||||
type mockRPCer struct {
|
||||
alloc *structs.Allocation
|
||||
callCounts map[string]int
|
||||
}
|
||||
|
||||
// RPC mocks the server RPCs, acting as though any request succeeds
|
||||
func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error {
|
||||
switch method {
|
||||
case "CSIVolume.Claim":
|
||||
r.callCounts["claim"]++
|
||||
req := args.(*structs.CSIVolumeClaimRequest)
|
||||
vol := testVolume(req.VolumeID)
|
||||
err := vol.Claim(req.ToClaim(), r.alloc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp := reply.(*structs.CSIVolumeClaimResponse)
|
||||
resp.PublishContext = map[string]string{}
|
||||
resp.Volume = vol
|
||||
resp.QueryMeta = structs.QueryMeta{}
|
||||
case "CSIVolume.Unpublish":
|
||||
r.callCounts["unpublish"]++
|
||||
resp := reply.(*structs.CSIVolumeUnpublishResponse)
|
||||
resp.QueryMeta = structs.QueryMeta{}
|
||||
default:
|
||||
return fmt.Errorf("unexpected method")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockVolumeMounter struct {
|
||||
callCounts map[string]int
|
||||
}
|
||||
|
||||
func (vm mockVolumeMounter) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *csimanager.UsageOptions, publishContext map[string]string) (*csimanager.MountInfo, error) {
|
||||
vm.callCounts["mount"]++
|
||||
return &csimanager.MountInfo{
|
||||
Source: filepath.Join("test-alloc-dir", alloc.ID, vol.ID, usageOpts.ToFS()),
|
||||
}, nil
|
||||
}
|
||||
func (vm mockVolumeMounter) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *csimanager.UsageOptions) error {
|
||||
vm.callCounts["unmount"]++
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockPluginManager struct {
|
||||
mounter mockVolumeMounter
|
||||
}
|
||||
|
||||
func (mgr mockPluginManager) MounterForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeMounter, error) {
|
||||
return mgr.mounter, nil
|
||||
}
|
||||
|
||||
// no-op methods to fulfill the interface
|
||||
func (mgr mockPluginManager) PluginManager() pluginmanager.PluginManager { return nil }
|
||||
func (mgr mockPluginManager) Shutdown() {}
|
||||
|
||||
type mockAllocRunner struct {
|
||||
res *cstructs.AllocHookResources
|
||||
caps *drivers.Capabilities
|
||||
}
|
||||
|
||||
func (ar mockAllocRunner) GetAllocHookResources() *cstructs.AllocHookResources {
|
||||
return ar.res
|
||||
}
|
||||
|
||||
func (ar mockAllocRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
|
||||
ar.res = res
|
||||
}
|
||||
|
||||
func (ar mockAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) {
|
||||
return ar.caps, nil
|
||||
}
|
Loading…
Reference in New Issue