CSI: set mounts in alloc hook resources atomically (#16722)
The allocrunner has a facility for passing data written by allocrunner hooks to taskrunner hooks. Currently the only consumers of this facility are the allocrunner CSI hook (which writes data) and the taskrunner volume hook (which reads that same data). The allocrunner hook for CSI volumes doesn't set the alloc hook resources atomically. Instead, it gets the current resources and then writes a new version back. Because the CSI hook is currently the only writer and all readers happen long afterwards, this should be safe but #16623 shows there's some sequence of events during restore where this breaks down. Refactor hook resources so that hook data is accessed via setters and getters that hold the mutex.
This commit is contained in:
parent
0c582a2c94
commit
118b703164
|
@ -128,9 +128,9 @@ type allocRunner struct {
|
|||
// transitions.
|
||||
runnerHooks []interfaces.RunnerHook
|
||||
|
||||
// hookState is the output of allocrunner hooks
|
||||
hookState *cstructs.AllocHookResources
|
||||
hookStateMu sync.RWMutex
|
||||
// hookResources holds the output from allocrunner hooks so that later
|
||||
// allocrunner hooks or task runner hooks can read them
|
||||
hookResources *cstructs.AllocHookResources
|
||||
|
||||
// tasks are the set of task runners
|
||||
tasks map[string]*taskrunner.TaskRunner
|
||||
|
@ -238,6 +238,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
|||
serviceRegWrapper: config.ServiceRegWrapper,
|
||||
checkStore: config.CheckStore,
|
||||
getter: config.Getter,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
}
|
||||
|
||||
// Create the logger based on the allocation ID
|
||||
|
@ -293,6 +294,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
|
|||
ShutdownDelayCtx: ar.shutdownDelayCtx,
|
||||
ServiceRegWrapper: ar.serviceRegWrapper,
|
||||
Getter: ar.getter,
|
||||
AllocHookResources: ar.hookResources,
|
||||
}
|
||||
|
||||
if ar.cpusetManager != nil {
|
||||
|
|
|
@ -7,40 +7,10 @@ import (
|
|||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
type hookResourceSetter interface {
|
||||
GetAllocHookResources() *cstructs.AllocHookResources
|
||||
SetAllocHookResources(*cstructs.AllocHookResources)
|
||||
}
|
||||
|
||||
type allocHookResourceSetter struct {
|
||||
ar *allocRunner
|
||||
}
|
||||
|
||||
func (a *allocHookResourceSetter) GetAllocHookResources() *cstructs.AllocHookResources {
|
||||
a.ar.hookStateMu.RLock()
|
||||
defer a.ar.hookStateMu.RUnlock()
|
||||
|
||||
return a.ar.hookState
|
||||
}
|
||||
|
||||
func (a *allocHookResourceSetter) SetAllocHookResources(res *cstructs.AllocHookResources) {
|
||||
a.ar.hookStateMu.Lock()
|
||||
defer a.ar.hookStateMu.Unlock()
|
||||
|
||||
a.ar.hookState = res
|
||||
|
||||
// Propagate to all of the TRs within the lock to ensure consistent state.
|
||||
// TODO: Refactor so TR's pull state from AR?
|
||||
for _, tr := range a.ar.tasks {
|
||||
tr.SetAllocHookResources(res)
|
||||
}
|
||||
}
|
||||
|
||||
// allocHealthSetter is a shim to allow the alloc health watcher hook to set
|
||||
// and clear the alloc health without full access to the alloc runner state
|
||||
type allocHealthSetter struct {
|
||||
|
@ -117,10 +87,6 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
|||
// create network isolation setting shim
|
||||
ns := &allocNetworkIsolationSetter{ar: ar}
|
||||
|
||||
// create hook resource setting shim
|
||||
hrs := &allocHookResourceSetter{ar: ar}
|
||||
hrs.SetAllocHookResources(&cstructs.AllocHookResources{})
|
||||
|
||||
// build the network manager
|
||||
nm, err := newNetworkManager(ar.Alloc(), ar.driverManager)
|
||||
if err != nil {
|
||||
|
@ -166,7 +132,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
|||
}),
|
||||
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig, config.Node.Attributes),
|
||||
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
|
||||
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, hrs, ar.clientConfig.Node.SecretID),
|
||||
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
|
||||
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,9 @@ import (
|
|||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
|
@ -27,7 +29,7 @@ type csiHook struct {
|
|||
// interfaces implemented by the allocRunner
|
||||
rpcClient RPCer
|
||||
taskCapabilityGetter taskCapabilityGetter
|
||||
updater hookResourceSetter
|
||||
hookResources *cstructs.AllocHookResources
|
||||
|
||||
nodeSecret string
|
||||
volumeRequests map[string]*volumeAndRequest
|
||||
|
@ -44,7 +46,7 @@ type taskCapabilityGetter interface {
|
|||
GetTaskDriverCapabilities(string) (*drivers.Capabilities, error)
|
||||
}
|
||||
|
||||
func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook {
|
||||
func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, hookResources *cstructs.AllocHookResources, nodeSecret string) *csiHook {
|
||||
|
||||
shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())
|
||||
|
||||
|
@ -54,7 +56,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
|
|||
csimanager: csi,
|
||||
rpcClient: rpcClient,
|
||||
taskCapabilityGetter: taskCapabilityGetter,
|
||||
updater: updater,
|
||||
hookResources: hookResources,
|
||||
nodeSecret: nodeSecret,
|
||||
volumeRequests: map[string]*volumeAndRequest{},
|
||||
minBackoffInterval: time.Second,
|
||||
|
@ -108,9 +110,8 @@ func (c *csiHook) Prerun() error {
|
|||
mounts[alias] = mountInfo
|
||||
}
|
||||
|
||||
res := c.updater.GetAllocHookResources()
|
||||
res.CSIMounts = mounts
|
||||
c.updater.SetAllocHookResources(res)
|
||||
// make the mounts available to the taskrunner's volume_hook
|
||||
c.hookResources.SetCSIMounts(mounts)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -203,7 +203,7 @@ func TestCSIHook(t *testing.T) {
|
|||
MountConfigs: drivers.MountConfigSupportAll,
|
||||
},
|
||||
}
|
||||
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
|
||||
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
|
||||
hook.minBackoffInterval = 1 * time.Millisecond
|
||||
hook.maxBackoffInterval = 10 * time.Millisecond
|
||||
hook.maxBackoffDuration = 500 * time.Millisecond
|
||||
|
@ -212,11 +212,11 @@ func TestCSIHook(t *testing.T) {
|
|||
|
||||
if tc.expectedClaimErr != nil {
|
||||
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
|
||||
mounts := ar.GetAllocHookResources().GetCSIMounts()
|
||||
mounts := ar.res.GetCSIMounts()
|
||||
require.Nil(t, mounts)
|
||||
} else {
|
||||
require.NoError(t, hook.Prerun())
|
||||
mounts := ar.GetAllocHookResources().GetCSIMounts()
|
||||
mounts := ar.res.GetCSIMounts()
|
||||
require.NotNil(t, mounts)
|
||||
require.Equal(t, tc.expectedMounts, mounts)
|
||||
require.NoError(t, hook.Postrun())
|
||||
|
@ -308,16 +308,16 @@ func TestCSIHook_claimVolumesFromAlloc_Validation(t *testing.T) {
|
|||
capFunc: tc.capFunc,
|
||||
}
|
||||
|
||||
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
|
||||
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
|
||||
require.NotNil(t, hook)
|
||||
|
||||
if tc.expectedClaimErr != nil {
|
||||
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
|
||||
mounts := ar.GetAllocHookResources().GetCSIMounts()
|
||||
mounts := ar.res.GetCSIMounts()
|
||||
require.Nil(t, mounts)
|
||||
} else {
|
||||
require.NoError(t, hook.Prerun())
|
||||
mounts := ar.GetAllocHookResources().GetCSIMounts()
|
||||
mounts := ar.res.GetCSIMounts()
|
||||
require.NotNil(t, mounts)
|
||||
require.NoError(t, hook.Postrun())
|
||||
}
|
||||
|
@ -431,14 +431,6 @@ type mockAllocRunner struct {
|
|||
capFunc func() (*drivers.Capabilities, error)
|
||||
}
|
||||
|
||||
func (ar mockAllocRunner) GetAllocHookResources() *cstructs.AllocHookResources {
|
||||
return ar.res
|
||||
}
|
||||
|
||||
func (ar mockAllocRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
|
||||
ar.res = res
|
||||
}
|
||||
|
||||
func (ar mockAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) {
|
||||
if ar.capFunc != nil {
|
||||
return ar.capFunc()
|
||||
|
|
|
@ -2,6 +2,7 @@ package interfaces
|
|||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/state"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
)
|
||||
|
||||
|
@ -32,3 +33,9 @@ type TaskStateHandler interface {
|
|||
type AllocStatsReporter interface {
|
||||
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
|
||||
}
|
||||
|
||||
// HookResourceSetter is used to communicate between alloc hooks and task hooks
|
||||
type HookResourceSetter interface {
|
||||
SetCSIMounts(map[string]*csimanager.MountInfo)
|
||||
GetCSIMounts(map[string]*csimanager.MountInfo)
|
||||
}
|
||||
|
|
|
@ -175,6 +175,9 @@ type TaskRunner struct {
|
|||
// hookResources captures the resources provided by hooks
|
||||
hookResources *hookResources
|
||||
|
||||
// allocHookResources captures the resources provided by the allocrunner hooks
|
||||
allocHookResources *cstructs.AllocHookResources
|
||||
|
||||
// consulClient is the client used by the consul service hook for
|
||||
// registering services and checks
|
||||
consulServiceClient serviceregistration.Handler
|
||||
|
@ -253,8 +256,6 @@ type TaskRunner struct {
|
|||
networkIsolationLock sync.Mutex
|
||||
networkIsolationSpec *drivers.NetworkIsolationSpec
|
||||
|
||||
allocHookResources *cstructs.AllocHookResources
|
||||
|
||||
// serviceRegWrapper is the handler wrapper that is used by service hooks
|
||||
// to perform service and check registration and deregistration.
|
||||
serviceRegWrapper *wrapper.HandlerWrapper
|
||||
|
@ -329,6 +330,10 @@ type Config struct {
|
|||
|
||||
// Getter is an interface for retrieving artifacts.
|
||||
Getter cinterfaces.ArtifactGetter
|
||||
|
||||
// AllocHookResources is how taskrunner hooks can get state written by
|
||||
// allocrunner hooks
|
||||
AllocHookResources *cstructs.AllocHookResources
|
||||
}
|
||||
|
||||
func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||
|
@ -368,6 +373,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
|||
vaultClient: config.Vault,
|
||||
state: tstate,
|
||||
localState: state.NewLocalState(),
|
||||
allocHookResources: config.AllocHookResources,
|
||||
stateDB: config.StateDB,
|
||||
stateUpdater: config.StateUpdater,
|
||||
deviceStatsReporter: config.DeviceStatsReporter,
|
||||
|
@ -1586,10 +1592,6 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) {
|
|||
return tr.driver.Capabilities()
|
||||
}
|
||||
|
||||
func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
|
||||
tr.allocHookResources = res
|
||||
}
|
||||
|
||||
// shutdownDelayCancel is used for testing only and cancels the
|
||||
// shutdownDelayCtx
|
||||
func (tr *TaskRunner) shutdownDelayCancel() {
|
||||
|
|
|
@ -132,16 +132,15 @@ func TestVolumeHook_prepareCSIVolumes(t *testing.T) {
|
|||
t.Run(tc.Name, func(t *testing.T) {
|
||||
|
||||
tr := &TaskRunner{
|
||||
task: req.Task,
|
||||
driver: tc.Driver,
|
||||
allocHookResources: &cstructs.AllocHookResources{
|
||||
CSIMounts: map[string]*csimanager.MountInfo{
|
||||
"foo": {
|
||||
Source: "/mnt/my-test-volume",
|
||||
},
|
||||
},
|
||||
},
|
||||
task: req.Task,
|
||||
driver: tc.Driver,
|
||||
allocHookResources: cstructs.NewAllocHookResources(),
|
||||
}
|
||||
tr.allocHookResources.SetCSIMounts(map[string]*csimanager.MountInfo{
|
||||
"foo": {
|
||||
Source: "/mnt/my-test-volume",
|
||||
},
|
||||
})
|
||||
|
||||
hook := &volumeHook{
|
||||
logger: testlog.HCLogger(t),
|
||||
|
|
|
@ -13,6 +13,16 @@ type MountInfo struct {
|
|||
IsDevice bool
|
||||
}
|
||||
|
||||
func (mi *MountInfo) Copy() *MountInfo {
|
||||
if mi == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nmi := new(MountInfo)
|
||||
*nmi = *mi
|
||||
return nmi
|
||||
}
|
||||
|
||||
type UsageOptions struct {
|
||||
ReadOnly bool
|
||||
AttachmentMode structs.CSIVolumeAttachmentMode
|
||||
|
|
|
@ -4,26 +4,39 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
)
|
||||
|
||||
// AllocHookResources contains data that is provided by AllocRunner Hooks for
|
||||
// consumption by TaskRunners
|
||||
// consumption by TaskRunners. This should be instantiated once in the
|
||||
// AllocRunner and then only accessed via getters and setters that hold the
|
||||
// lock.
|
||||
type AllocHookResources struct {
|
||||
CSIMounts map[string]*csimanager.MountInfo
|
||||
csiMounts map[string]*csimanager.MountInfo
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewAllocHookResources() *AllocHookResources {
|
||||
return &AllocHookResources{
|
||||
csiMounts: map[string]*csimanager.MountInfo{},
|
||||
}
|
||||
}
|
||||
|
||||
// GetCSIMounts returns a copy of the CSI mount info previously written by the
|
||||
// CSI allocrunner hook
|
||||
func (a *AllocHookResources) GetCSIMounts() map[string]*csimanager.MountInfo {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
|
||||
return a.CSIMounts
|
||||
return helper.DeepCopyMap(a.csiMounts)
|
||||
}
|
||||
|
||||
// SetCSIMounts stores the CSI mount info for later use by the volume taskrunner
|
||||
// hook
|
||||
func (a *AllocHookResources) SetCSIMounts(m map[string]*csimanager.MountInfo) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
a.CSIMounts = m
|
||||
a.csiMounts = m
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue