gracefully recover tasks that use csi node plugins (#16809)
new WaitForPlugin() called during csiHook.Prerun, so that on startup, clients can recover running tasks that use CSI volumes, instead of them being terminated and rescheduled because they need a node plugin that is "not found" *yet*, only because the plugin task has not yet been recovered.
This commit is contained in:
parent
1335543731
commit
fa33ee567a
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
csi: gracefully recover tasks that use csi node plugins
|
||||||
|
```
|
|
@ -9,7 +9,7 @@ import (
|
||||||
|
|
||||||
hclog "github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
|
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||||
"github.com/hashicorp/nomad/helper"
|
"github.com/hashicorp/nomad/helper"
|
||||||
|
@ -85,11 +85,15 @@ func (c *csiHook) Prerun() error {
|
||||||
mounts := make(map[string]*csimanager.MountInfo, len(volumes))
|
mounts := make(map[string]*csimanager.MountInfo, len(volumes))
|
||||||
for alias, pair := range volumes {
|
for alias, pair := range volumes {
|
||||||
|
|
||||||
// We use this context only to attach hclog to the gRPC
|
// make sure the plugin is ready or becomes so quickly.
|
||||||
// context. The lifetime is the lifetime of the gRPC stream,
|
plugin := pair.volume.PluginID
|
||||||
// not specific RPC timeouts, but we manage the stream
|
pType := dynamicplugins.PluginTypeCSINode
|
||||||
// lifetime via Close in the pluginmanager.
|
if err := c.csimanager.WaitForPlugin(c.shutdownCtx, pType, plugin); err != nil {
|
||||||
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID)
|
return err
|
||||||
|
}
|
||||||
|
c.logger.Debug("found CSI plugin", "type", pType, "name", plugin)
|
||||||
|
|
||||||
|
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, plugin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -417,6 +417,10 @@ type mockPluginManager struct {
|
||||||
mounter mockVolumeMounter
|
mounter mockVolumeMounter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mgr mockPluginManager) WaitForPlugin(ctx context.Context, pluginType, pluginID string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (mgr mockPluginManager) MounterForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeMounter, error) {
|
func (mgr mockPluginManager) MounterForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeMounter, error) {
|
||||||
return mgr.mounter, nil
|
return mgr.mounter, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,9 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -22,6 +25,7 @@ type Registry interface {
|
||||||
RegisterPlugin(info *PluginInfo) error
|
RegisterPlugin(info *PluginInfo) error
|
||||||
DeregisterPlugin(ptype, name, allocID string) error
|
DeregisterPlugin(ptype, name, allocID string) error
|
||||||
|
|
||||||
|
WaitForPlugin(ctx context.Context, ptype, pname string) (*PluginInfo, error)
|
||||||
ListPlugins(ptype string) []*PluginInfo
|
ListPlugins(ptype string) []*PluginInfo
|
||||||
DispensePlugin(ptype, name string) (interface{}, error)
|
DispensePlugin(ptype, name string) (interface{}, error)
|
||||||
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)
|
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)
|
||||||
|
@ -301,6 +305,62 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
|
||||||
return plugins
|
return plugins
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForPlugin repeatedly checks until a plugin with a given type and name
|
||||||
|
// becomes available or its context is canceled or times out.
|
||||||
|
// Callers should pass in a context with a sensible timeout
|
||||||
|
// for the plugin they're expecting to find.
|
||||||
|
func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) (*PluginInfo, error) {
|
||||||
|
// this is our actual goal, which may be run repeatedly
|
||||||
|
findPlugin := func() *PluginInfo {
|
||||||
|
for _, p := range d.ListPlugins(ptype) {
|
||||||
|
if p.Name == name {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// try immediately first, before any timers get involved
|
||||||
|
if p := findPlugin(); p != nil {
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// next, loop until found or context is done
|
||||||
|
|
||||||
|
// these numbers are almost arbitrary...
|
||||||
|
delay := 200 // milliseconds between checks, will backoff
|
||||||
|
maxDelay := 5000 // up to 5 seconds between each check
|
||||||
|
|
||||||
|
// put a long upper bound on total time,
|
||||||
|
// just in case callers don't follow directions.
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 24*time.Hour)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
timer, stop := helper.NewSafeTimer(time.Duration(delay) * time.Millisecond)
|
||||||
|
defer stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// an externally-defined timeout wins the day
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-timer.C:
|
||||||
|
// continue after our internal delay
|
||||||
|
}
|
||||||
|
|
||||||
|
if p := findPlugin(); p != nil {
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if delay < maxDelay {
|
||||||
|
delay += delay
|
||||||
|
}
|
||||||
|
if delay > maxDelay {
|
||||||
|
delay = maxDelay
|
||||||
|
}
|
||||||
|
timer.Reset(time.Duration(delay) * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}, error) {
|
func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}, error) {
|
||||||
d.pluginsLock.Lock()
|
d.pluginsLock.Lock()
|
||||||
defer d.pluginsLock.Unlock()
|
defer d.pluginsLock.Unlock()
|
||||||
|
|
|
@ -59,6 +59,10 @@ type Manager interface {
|
||||||
// PluginManager returns a PluginManager for use by the node fingerprinter.
|
// PluginManager returns a PluginManager for use by the node fingerprinter.
|
||||||
PluginManager() pluginmanager.PluginManager
|
PluginManager() pluginmanager.PluginManager
|
||||||
|
|
||||||
|
// WaitForPlugin waits for the plugin to become available,
|
||||||
|
// or until its context is canceled or times out.
|
||||||
|
WaitForPlugin(ctx context.Context, pluginType, pluginID string) error
|
||||||
|
|
||||||
// MounterForPlugin returns a VolumeMounter for the plugin ID associated
|
// MounterForPlugin returns a VolumeMounter for the plugin ID associated
|
||||||
// with the volume. Returns an error if this plugin isn't registered.
|
// with the volume. Returns an error if this plugin isn't registered.
|
||||||
MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error)
|
MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error)
|
||||||
|
|
|
@ -39,7 +39,7 @@ func New(config *Config) Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &csiManager{
|
return &csiManager{
|
||||||
logger: config.Logger,
|
logger: config.Logger.Named("csi_manager"),
|
||||||
eventer: config.TriggerNodeEvent,
|
eventer: config.TriggerNodeEvent,
|
||||||
registry: config.DynamicRegistry,
|
registry: config.DynamicRegistry,
|
||||||
instances: make(map[string]map[string]*instanceManager),
|
instances: make(map[string]map[string]*instanceManager),
|
||||||
|
@ -75,6 +75,21 @@ func (c *csiManager) PluginManager() pluginmanager.PluginManager {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForPlugin waits for a specific plugin to be registered and available,
|
||||||
|
// unless the context is canceled, or it takes longer than a minute.
|
||||||
|
func (c *csiManager) WaitForPlugin(ctx context.Context, pType, pID string) error {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
p, err := c.registry.WaitForPlugin(ctx, pType, pID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s plugin '%s' did not become ready: %w", pType, pID, err)
|
||||||
|
}
|
||||||
|
c.instancesLock.Lock()
|
||||||
|
defer c.instancesLock.Unlock()
|
||||||
|
c.ensureInstance(p)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) {
|
func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) {
|
||||||
c.instancesLock.RLock()
|
c.instancesLock.RLock()
|
||||||
defer c.instancesLock.RUnlock()
|
defer c.instancesLock.RUnlock()
|
||||||
|
|
|
@ -1,15 +1,18 @@
|
||||||
package csimanager
|
package csimanager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/nomad/ci"
|
||||||
"github.com/hashicorp/nomad/client/dynamicplugins"
|
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||||
"github.com/hashicorp/nomad/client/pluginmanager"
|
"github.com/hashicorp/nomad/client/pluginmanager"
|
||||||
"github.com/hashicorp/nomad/helper/testlog"
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
"github.com/shoenig/test/must"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -93,6 +96,41 @@ func TestManager_DeregisterPlugin(t *testing.T) {
|
||||||
}, 5*time.Second, 10*time.Millisecond)
|
}, 5*time.Second, 10*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestManager_WaitForPlugin(t *testing.T) {
|
||||||
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
registry := setupRegistry(nil)
|
||||||
|
t.Cleanup(registry.Shutdown)
|
||||||
|
pm := testManager(t, registry, 5*time.Second) // resync period can be long.
|
||||||
|
t.Cleanup(pm.Shutdown)
|
||||||
|
pm.Run()
|
||||||
|
|
||||||
|
t.Run("never happens", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
err := pm.WaitForPlugin(ctx, "bad-type", "bad-name")
|
||||||
|
must.Error(t, err)
|
||||||
|
must.ErrorContains(t, err, "did not become ready: context deadline exceeded")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ok after delay", func(t *testing.T) {
|
||||||
|
plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController)
|
||||||
|
|
||||||
|
// register the plugin in the near future
|
||||||
|
time.AfterFunc(100*time.Millisecond, func() {
|
||||||
|
err := registry.RegisterPlugin(plugin)
|
||||||
|
must.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
err := pm.WaitForPlugin(ctx, plugin.Type, plugin.Name)
|
||||||
|
must.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// TestManager_MultiplePlugins ensures that multiple plugins with the same
|
// TestManager_MultiplePlugins ensures that multiple plugins with the same
|
||||||
// name but different types (as found with monolith plugins) don't interfere
|
// name but different types (as found with monolith plugins) don't interfere
|
||||||
// with each other.
|
// with each other.
|
||||||
|
|
Loading…
Reference in New Issue