CSI: allow for concurrent plugin allocations (#12078)
The dynamic plugin registry assumes that plugins are singletons, which matches the behavior of other Nomad plugins. But because dynamic plugins like CSI are implemented by allocations, we need to handle the possibility of multiple allocations for a given plugin type + ID, as well as behaviors around interleaved allocation starts and stops. Update the data structure for the dynamic registry so that more recent allocations take over as the instance manager singleton, but we still preserve the previous running allocations so that restores work without racing. Multiple allocations can run on a client for the same plugin, even if only during updates. Provide each plugin task a unique path for the control socket so that the tasks don't interfere with each other.
This commit is contained in:
parent
e5a52b0b6f
commit
246db87a74
|
@ -0,0 +1,11 @@
|
|||
```release-note:improvement
|
||||
csi: Allow for concurrent plugin allocations
|
||||
```
|
||||
|
||||
```release-note:breaking-change
|
||||
client: The client state store will be automatically migrated to a new schema version when upgrading a client. Downgrading to a previous version of the client after upgrading it to Nomad 1.3 is not supported. To downgrade safely, users should erase the Nomad client's data directory.
|
||||
```
|
||||
|
||||
```release-note:breaking-change
|
||||
csi: The client filesystem layout for CSI plugins has been updated to correctly handle the lifecycle of multiple allocations serving the same plugin. Running plugin tasks will not be updated after upgrading the client, but it is recommended to redeploy CSI plugin jobs after upgrading the cluster.
|
||||
```
|
|
@ -21,18 +21,20 @@ import (
|
|||
// tasks. These plugins will be fingerprinted and it will manage connecting them
|
||||
// to their requisite plugin manager.
|
||||
//
|
||||
// It provides a couple of things to a task running inside Nomad. These are:
|
||||
// * A mount to the `plugin_mount_dir`, that will then be used by Nomad
|
||||
// to connect to the nested plugin and handle volume mounts.
|
||||
// It provides a few things to a plugin task running inside Nomad. These are:
|
||||
// * A mount to the `csi_plugin.mount_dir` where the plugin will create its csi.sock
|
||||
// * A mount to `local/csi` that node plugins will use to stage volume mounts.
|
||||
// * When the task has started, it starts a loop of attempting to connect to the
|
||||
// plugin, to perform initial fingerprinting of the plugins capabilities before
|
||||
// notifying the plugin manager of the plugin.
|
||||
type csiPluginSupervisorHook struct {
|
||||
logger hclog.Logger
|
||||
alloc *structs.Allocation
|
||||
task *structs.Task
|
||||
runner *TaskRunner
|
||||
mountPoint string
|
||||
logger hclog.Logger
|
||||
alloc *structs.Allocation
|
||||
task *structs.Task
|
||||
runner *TaskRunner
|
||||
mountPoint string
|
||||
socketMountPoint string
|
||||
socketPath string
|
||||
|
||||
caps *drivers.Capabilities
|
||||
|
||||
|
@ -73,20 +75,36 @@ var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{}
|
|||
// with the catalog and to ensure any mounts are cleaned up.
|
||||
var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{}
|
||||
|
||||
// This hook creates a csi/ directory within the client's datadir used to
|
||||
// manage plugins and mount points volumes. The layout is as follows:
|
||||
|
||||
// plugins/
|
||||
// {alloc-id}/csi.sock
|
||||
// Per-allocation directories of unix domain sockets used to communicate
|
||||
// with the CSI plugin. Nomad creates the directory and the plugin creates
|
||||
// the socket file. This directory is bind-mounted to the
|
||||
// csi_plugin.mount_config dir in the plugin task.
|
||||
//
|
||||
// {plugin-type}/{plugin-id}/
|
||||
// staging/
|
||||
// {volume-id}/{usage-mode}/
|
||||
// Intermediate mount point used by node plugins that support
|
||||
// NODE_STAGE_UNSTAGE capability.
|
||||
//
|
||||
// per-alloc/
|
||||
// {alloc-id}/{volume-id}/{usage-mode}/
|
||||
// Mount point bound from the staging directory into tasks that use
|
||||
// the mounted volumes
|
||||
|
||||
func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPluginSupervisorHook {
|
||||
task := config.runner.Task()
|
||||
|
||||
// The Plugin directory will look something like this:
|
||||
// .
|
||||
// ..
|
||||
// csi.sock - A unix domain socket used to communicate with the CSI Plugin
|
||||
// staging/
|
||||
// {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities.
|
||||
// per-alloc/
|
||||
// {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume
|
||||
pluginRoot := filepath.Join(config.clientStateDirPath, "csi",
|
||||
string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID)
|
||||
|
||||
socketMountPoint := filepath.Join(config.clientStateDirPath, "csi",
|
||||
"plugins", config.runner.Alloc().ID)
|
||||
|
||||
shutdownCtx, cancelFn := context.WithCancel(context.Background())
|
||||
|
||||
hook := &csiPluginSupervisorHook{
|
||||
|
@ -96,6 +114,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
|
|||
logger: config.logger,
|
||||
task: task,
|
||||
mountPoint: pluginRoot,
|
||||
socketMountPoint: socketMountPoint,
|
||||
caps: config.capabilities,
|
||||
shutdownCtx: shutdownCtx,
|
||||
shutdownCancelFn: cancelFn,
|
||||
|
@ -122,18 +141,46 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
|
|||
return fmt.Errorf("failed to create mount point: %v", err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(h.socketMountPoint, 0700); err != nil && !os.IsExist(err) {
|
||||
return fmt.Errorf("failed to create socket mount point: %v", err)
|
||||
}
|
||||
|
||||
// where the socket will be mounted
|
||||
configMount := &drivers.MountConfig{
|
||||
TaskPath: h.task.CSIPluginConfig.MountDir,
|
||||
HostPath: h.socketMountPoint,
|
||||
Readonly: false,
|
||||
PropagationMode: "bidirectional",
|
||||
}
|
||||
// where the staging and per-alloc directories will be mounted
|
||||
volumeStagingMounts := &drivers.MountConfig{
|
||||
// TODO(tgross): add this TaskPath to the CSIPluginConfig as well
|
||||
TaskPath: "/local/csi",
|
||||
HostPath: h.mountPoint,
|
||||
Readonly: false,
|
||||
PropagationMode: "bidirectional",
|
||||
}
|
||||
// devices from the host
|
||||
devMount := &drivers.MountConfig{
|
||||
TaskPath: "/dev",
|
||||
HostPath: "/dev",
|
||||
Readonly: false,
|
||||
}
|
||||
|
||||
// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
|
||||
// If we're already registered, we should be able to update the
|
||||
// definition in the update hook
|
||||
|
||||
// For backwards compatibility, ensure that we don't overwrite the
|
||||
// socketPath on client restart with existing plugin allocations.
|
||||
pluginInfo, _ := h.runner.dynamicRegistry.PluginForAlloc(
|
||||
string(h.task.CSIPluginConfig.Type), h.task.CSIPluginConfig.ID, h.alloc.ID)
|
||||
if pluginInfo != nil {
|
||||
h.socketPath = pluginInfo.ConnectionInfo.SocketPath
|
||||
} else {
|
||||
h.socketPath = filepath.Join(h.socketMountPoint, structs.CSISocketName)
|
||||
}
|
||||
|
||||
switch h.caps.FSIsolation {
|
||||
case drivers.FSIsolationNone:
|
||||
// Plugin tasks with no filesystem isolation won't have the
|
||||
|
@ -142,13 +189,15 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
|
|||
// plugins will need to be aware of the csi directory layout
|
||||
// in the client data dir
|
||||
resp.Env = map[string]string{
|
||||
"CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")}
|
||||
"CSI_ENDPOINT": h.socketPath}
|
||||
default:
|
||||
resp.Env = map[string]string{
|
||||
"CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")}
|
||||
"CSI_ENDPOINT": filepath.Join(
|
||||
h.task.CSIPluginConfig.MountDir, structs.CSISocketName)}
|
||||
}
|
||||
|
||||
mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount)
|
||||
mounts = ensureMountpointInserted(mounts, volumeStagingMounts)
|
||||
mounts = ensureMountpointInserted(mounts, devMount)
|
||||
|
||||
h.runner.hookResources.setMounts(mounts)
|
||||
|
@ -203,9 +252,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
|
|||
h.runningLock.Unlock()
|
||||
}()
|
||||
|
||||
socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)
|
||||
|
||||
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
|
||||
client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With(
|
||||
"plugin.name", h.task.CSIPluginConfig.ID,
|
||||
"plugin.type", h.task.CSIPluginConfig.Type))
|
||||
defer client.Close()
|
||||
|
@ -249,7 +296,7 @@ WAITFORREADY:
|
|||
}
|
||||
|
||||
// Step 2: Register the plugin with the catalog.
|
||||
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
|
||||
deregisterPluginFn, err := h.registerPlugin(client, h.socketPath)
|
||||
if err != nil {
|
||||
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
|
||||
return
|
||||
|
@ -317,7 +364,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
|
|||
Options: map[string]string{
|
||||
"Provider": info.Name, // vendor name
|
||||
"MountPoint": h.mountPoint,
|
||||
"ContainerMountPoint": h.task.CSIPluginConfig.MountDir,
|
||||
"ContainerMountPoint": "/local/csi",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -348,8 +395,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
|
|||
// closes over its own registration
|
||||
rname := reg.Name
|
||||
rtype := reg.Type
|
||||
allocID := reg.AllocID
|
||||
deregistrationFns = append(deregistrationFns, func() {
|
||||
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname)
|
||||
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID)
|
||||
if err != nil {
|
||||
h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err)
|
||||
}
|
||||
|
@ -384,6 +432,10 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client
|
|||
// Stop hooks must be idempotent. The context is cancelled prematurely if the
|
||||
// task is killed.
|
||||
func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
|
||||
err := os.RemoveAll(h.socketMountPoint)
|
||||
if err != nil {
|
||||
h.logger.Error("could not remove plugin socket directory", "dir", h.socketMountPoint, "error", err)
|
||||
}
|
||||
h.shutdownCancelFn()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
package dynamicplugins
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -19,10 +20,11 @@ const (
|
|||
// that are running as Nomad Tasks.
|
||||
type Registry interface {
|
||||
RegisterPlugin(info *PluginInfo) error
|
||||
DeregisterPlugin(ptype, name string) error
|
||||
DeregisterPlugin(ptype, name, allocID string) error
|
||||
|
||||
ListPlugins(ptype string) []*PluginInfo
|
||||
DispensePlugin(ptype, name string) (interface{}, error)
|
||||
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)
|
||||
|
||||
PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent
|
||||
|
||||
|
@ -31,10 +33,11 @@ type Registry interface {
|
|||
StubDispenserForType(ptype string, dispenser PluginDispenser)
|
||||
}
|
||||
|
||||
// RegistryState is what we persist in the client state store. It contains
|
||||
// a map of plugin types to maps of plugin name -> PluginInfo.
|
||||
// RegistryState is what we persist in the client state
|
||||
// store. It contains a map of plugin types to maps of plugin name ->
|
||||
// list of *PluginInfo, sorted by recency of registration
|
||||
type RegistryState struct {
|
||||
Plugins map[string]map[string]*PluginInfo
|
||||
Plugins map[string]map[string]*list.List
|
||||
}
|
||||
|
||||
type PluginDispenser func(info *PluginInfo) (interface{}, error)
|
||||
|
@ -44,7 +47,7 @@ type PluginDispenser func(info *PluginInfo) (interface{}, error)
|
|||
func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry {
|
||||
|
||||
registry := &dynamicRegistry{
|
||||
plugins: make(map[string]map[string]*PluginInfo),
|
||||
plugins: make(map[string]map[string]*list.List),
|
||||
broadcasters: make(map[string]*pluginEventBroadcaster),
|
||||
dispensers: dispensers,
|
||||
state: state,
|
||||
|
@ -122,7 +125,7 @@ type PluginUpdateEvent struct {
|
|||
}
|
||||
|
||||
type dynamicRegistry struct {
|
||||
plugins map[string]map[string]*PluginInfo
|
||||
plugins map[string]map[string]*list.List
|
||||
pluginsLock sync.RWMutex
|
||||
|
||||
broadcasters map[string]*pluginEventBroadcaster
|
||||
|
@ -180,18 +183,35 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error {
|
|||
|
||||
pmap, ok := d.plugins[info.Type]
|
||||
if !ok {
|
||||
pmap = make(map[string]*PluginInfo, 1)
|
||||
pmap = make(map[string]*list.List)
|
||||
d.plugins[info.Type] = pmap
|
||||
}
|
||||
|
||||
pmap[info.Name] = info
|
||||
|
||||
broadcaster := d.broadcasterForPluginType(info.Type)
|
||||
event := &PluginUpdateEvent{
|
||||
EventType: EventTypeRegistered,
|
||||
Info: info,
|
||||
infos, ok := pmap[info.Name]
|
||||
if !ok {
|
||||
infos = list.New()
|
||||
pmap[info.Name] = infos
|
||||
}
|
||||
|
||||
// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
|
||||
// If we're already registered, we should update the definition
|
||||
// and send a broadcast of any update so the instanceManager can
|
||||
// be restarted if there's been a change
|
||||
var alreadyRegistered bool
|
||||
for e := infos.Front(); e != nil; e = e.Next() {
|
||||
if e.Value.(*PluginInfo).AllocID == info.AllocID {
|
||||
alreadyRegistered = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !alreadyRegistered {
|
||||
infos.PushFront(info)
|
||||
broadcaster := d.broadcasterForPluginType(info.Type)
|
||||
event := &PluginUpdateEvent{
|
||||
EventType: EventTypeRegistered,
|
||||
Info: info,
|
||||
}
|
||||
broadcaster.broadcast(event)
|
||||
}
|
||||
broadcaster.broadcast(event)
|
||||
|
||||
return d.sync()
|
||||
}
|
||||
|
@ -209,7 +229,7 @@ func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBro
|
|||
return broadcaster
|
||||
}
|
||||
|
||||
func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
|
||||
func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error {
|
||||
d.pluginsLock.Lock()
|
||||
defer d.pluginsLock.Unlock()
|
||||
|
||||
|
@ -223,6 +243,9 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
|
|||
// developers during the development of new plugin types.
|
||||
return errors.New("must specify plugin name to deregister")
|
||||
}
|
||||
if allocID == "" {
|
||||
return errors.New("must specify plugin allocation ID to deregister")
|
||||
}
|
||||
|
||||
pmap, ok := d.plugins[ptype]
|
||||
if !ok {
|
||||
|
@ -230,12 +253,20 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
|
|||
return fmt.Errorf("no plugins registered for type: %s", ptype)
|
||||
}
|
||||
|
||||
info, ok := pmap[name]
|
||||
infos, ok := pmap[name]
|
||||
if !ok {
|
||||
// plugin already deregistered, don't send events or try re-deleting.
|
||||
return nil
|
||||
}
|
||||
delete(pmap, name)
|
||||
|
||||
var info *PluginInfo
|
||||
for e := infos.Front(); e != nil; e = e.Next() {
|
||||
info = e.Value.(*PluginInfo)
|
||||
if info.AllocID == allocID {
|
||||
infos.Remove(e)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
broadcaster := d.broadcasterForPluginType(ptype)
|
||||
event := &PluginUpdateEvent{
|
||||
|
@ -259,7 +290,9 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
|
|||
plugins := make([]*PluginInfo, 0, len(pmap))
|
||||
|
||||
for _, info := range pmap {
|
||||
plugins = append(plugins, info)
|
||||
if info.Front() != nil {
|
||||
plugins = append(plugins, info.Front().Value.(*PluginInfo))
|
||||
}
|
||||
}
|
||||
|
||||
return plugins
|
||||
|
@ -302,11 +335,32 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}
|
|||
}
|
||||
|
||||
info, ok := pmap[name]
|
||||
if !ok {
|
||||
if !ok || info.Front() == nil {
|
||||
return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype)
|
||||
}
|
||||
|
||||
return dispenseFunc(info)
|
||||
return dispenseFunc(info.Front().Value.(*PluginInfo))
|
||||
}
|
||||
|
||||
func (d *dynamicRegistry) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) {
|
||||
d.pluginsLock.Lock()
|
||||
defer d.pluginsLock.Unlock()
|
||||
|
||||
pmap, ok := d.plugins[ptype]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no plugins registered for type: %s", ptype)
|
||||
}
|
||||
|
||||
infos, ok := pmap[name]
|
||||
if ok {
|
||||
for e := infos.Front(); e != nil; e = e.Next() {
|
||||
plugin := e.Value.(*PluginInfo)
|
||||
if plugin.AllocID == allocID {
|
||||
return plugin, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("no plugin for that allocation")
|
||||
}
|
||||
|
||||
// PluginsUpdatedCh returns a channel over which plugin events for the requested
|
||||
|
|
|
@ -2,6 +2,7 @@ package dynamicplugins
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -132,11 +133,12 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
|
|||
err := r.RegisterPlugin(&PluginInfo{
|
||||
Type: "csi",
|
||||
Name: "my-plugin",
|
||||
AllocID: "alloc-0",
|
||||
ConnectionInfo: &PluginConnectionInfo{},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = r.DeregisterPlugin("csi", "my-plugin")
|
||||
err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
|
@ -178,6 +180,7 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) {
|
|||
err := r.RegisterPlugin(&PluginInfo{
|
||||
Type: PluginTypeCSIController,
|
||||
Name: "my-plugin",
|
||||
AllocID: "alloc-0",
|
||||
ConnectionInfo: &PluginConnectionInfo{},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -185,14 +188,15 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) {
|
|||
err = r.RegisterPlugin(&PluginInfo{
|
||||
Type: PluginTypeCSINode,
|
||||
Name: "my-plugin",
|
||||
AllocID: "alloc-1",
|
||||
ConnectionInfo: &PluginConnectionInfo{},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin")
|
||||
err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin", "alloc-0")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(r.ListPlugins(PluginTypeCSINode)), 1)
|
||||
require.Equal(t, len(r.ListPlugins(PluginTypeCSIController)), 0)
|
||||
require.Equal(t, 1, len(r.ListPlugins(PluginTypeCSINode)))
|
||||
require.Equal(t, 0, len(r.ListPlugins(PluginTypeCSIController)))
|
||||
}
|
||||
|
||||
func TestDynamicRegistry_StateStore(t *testing.T) {
|
||||
|
@ -221,6 +225,120 @@ func TestDynamicRegistry_StateStore(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {
|
||||
|
||||
t.Parallel()
|
||||
dispenseFn := func(i *PluginInfo) (interface{}, error) {
|
||||
return i, nil
|
||||
}
|
||||
|
||||
newPlugin := func(idx int) *PluginInfo {
|
||||
id := fmt.Sprintf("alloc-%d", idx)
|
||||
return &PluginInfo{
|
||||
Name: "my-plugin",
|
||||
Type: PluginTypeCSINode,
|
||||
Version: fmt.Sprintf("v%d", idx),
|
||||
ConnectionInfo: &PluginConnectionInfo{
|
||||
SocketPath: "/var/data/alloc/" + id + "/csi.sock"},
|
||||
AllocID: id,
|
||||
}
|
||||
}
|
||||
|
||||
dispensePlugin := func(t *testing.T, reg Registry) *PluginInfo {
|
||||
result, err := reg.DispensePlugin(PluginTypeCSINode, "my-plugin")
|
||||
require.NotNil(t, result)
|
||||
require.NoError(t, err)
|
||||
plugin := result.(*PluginInfo)
|
||||
return plugin
|
||||
}
|
||||
|
||||
t.Run("restore races on client restart", func(t *testing.T) {
|
||||
plugin0 := newPlugin(0)
|
||||
plugin1 := newPlugin(1)
|
||||
|
||||
memdb := &MemDB{}
|
||||
oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
|
||||
|
||||
// add a plugin and a new alloc running the same plugin
|
||||
// (without stopping the old one)
|
||||
require.NoError(t, oldR.RegisterPlugin(plugin0))
|
||||
require.NoError(t, oldR.RegisterPlugin(plugin1))
|
||||
plugin := dispensePlugin(t, oldR)
|
||||
require.Equal(t, "alloc-1", plugin.AllocID)
|
||||
|
||||
// client restarts and we load state from disk.
|
||||
// most recently inserted plugin is current
|
||||
|
||||
newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
|
||||
plugin = dispensePlugin(t, oldR)
|
||||
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
|
||||
require.Equal(t, "alloc-1", plugin.AllocID)
|
||||
|
||||
// RestoreTask fires for all allocations, which runs the
|
||||
// plugin_supervisor_hook. But there's a race and the allocations
|
||||
// in this scenario are Restored in the opposite order they were
|
||||
// created
|
||||
|
||||
require.NoError(t, newR.RegisterPlugin(plugin0))
|
||||
plugin = dispensePlugin(t, newR)
|
||||
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
|
||||
require.Equal(t, "alloc-1", plugin.AllocID)
|
||||
})
|
||||
|
||||
t.Run("replacement races on host restart", func(t *testing.T) {
|
||||
plugin0 := newPlugin(0)
|
||||
plugin1 := newPlugin(1)
|
||||
plugin2 := newPlugin(2)
|
||||
|
||||
memdb := &MemDB{}
|
||||
oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
|
||||
|
||||
// add a plugin and a new alloc running the same plugin
|
||||
// (without stopping the old one)
|
||||
require.NoError(t, oldR.RegisterPlugin(plugin0))
|
||||
require.NoError(t, oldR.RegisterPlugin(plugin1))
|
||||
plugin := dispensePlugin(t, oldR)
|
||||
require.Equal(t, "alloc-1", plugin.AllocID)
|
||||
|
||||
// client restarts and we load state from disk.
|
||||
// most recently inserted plugin is current
|
||||
|
||||
newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
|
||||
plugin = dispensePlugin(t, oldR)
|
||||
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
|
||||
require.Equal(t, "alloc-1", plugin.AllocID)
|
||||
|
||||
// RestoreTask fires for all allocations but none of them are
|
||||
// running because we restarted the whole host. Server gives
|
||||
// us a replacement alloc
|
||||
|
||||
require.NoError(t, newR.RegisterPlugin(plugin2))
|
||||
plugin = dispensePlugin(t, newR)
|
||||
require.Equal(t, "/var/data/alloc/alloc-2/csi.sock", plugin.ConnectionInfo.SocketPath)
|
||||
require.Equal(t, "alloc-2", plugin.AllocID)
|
||||
})
|
||||
|
||||
t.Run("interleaved register and deregister", func(t *testing.T) {
|
||||
plugin0 := newPlugin(0)
|
||||
plugin1 := newPlugin(1)
|
||||
|
||||
memdb := &MemDB{}
|
||||
reg := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
|
||||
|
||||
require.NoError(t, reg.RegisterPlugin(plugin0))
|
||||
|
||||
// replacement is registered before old plugin deregisters
|
||||
require.NoError(t, reg.RegisterPlugin(plugin1))
|
||||
plugin := dispensePlugin(t, reg)
|
||||
require.Equal(t, "alloc-1", plugin.AllocID)
|
||||
|
||||
reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin", "alloc-0")
|
||||
plugin = dispensePlugin(t, reg)
|
||||
require.Equal(t, "alloc-1", plugin.AllocID)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// MemDB implements a StateDB that stores data in memory and should only be
|
||||
// used for testing. All methods are safe for concurrent use. This is a
|
||||
// partial implementation of the MemDB in the client/state package, copied
|
||||
|
|
|
@ -55,7 +55,7 @@ func New(config *Config) Manager {
|
|||
|
||||
type csiManager struct {
|
||||
// instances should only be accessed from the run() goroutine and the shutdown
|
||||
// fn. It is a map of PluginType : [PluginName : instanceManager]
|
||||
// fn. It is a map of PluginType : [PluginName : *instanceManager]
|
||||
instances map[string]map[string]*instanceManager
|
||||
|
||||
registry dynamicplugins.Registry
|
||||
|
@ -167,11 +167,19 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) {
|
|||
name := plugin.Name
|
||||
ptype := plugin.Type
|
||||
instances := c.instancesForType(ptype)
|
||||
if _, ok := instances[name]; !ok {
|
||||
c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype)
|
||||
mgr, ok := instances[name]
|
||||
if !ok {
|
||||
c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
|
||||
mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin)
|
||||
instances[name] = mgr
|
||||
mgr.run()
|
||||
} else if mgr.allocID != plugin.AllocID {
|
||||
mgr.shutdown()
|
||||
c.logger.Debug("detected update for CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
|
||||
mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin)
|
||||
instances[name] = mgr
|
||||
mgr.run()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -182,9 +190,11 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) {
|
|||
ptype := plugin.Type
|
||||
instances := c.instancesForType(ptype)
|
||||
if mgr, ok := instances[name]; ok {
|
||||
c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype)
|
||||
mgr.shutdown()
|
||||
delete(instances, name)
|
||||
if mgr.allocID == plugin.AllocID {
|
||||
c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
|
||||
mgr.shutdown()
|
||||
delete(instances, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package csimanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -13,100 +15,85 @@ import (
|
|||
|
||||
var _ pluginmanager.PluginManager = (*csiManager)(nil)
|
||||
|
||||
var fakePlugin = &dynamicplugins.PluginInfo{
|
||||
Name: "my-plugin",
|
||||
Type: "csi-controller",
|
||||
ConnectionInfo: &dynamicplugins.PluginConnectionInfo{},
|
||||
func fakePlugin(idx int, pluginType string) *dynamicplugins.PluginInfo {
|
||||
id := fmt.Sprintf("alloc-%d", idx)
|
||||
return &dynamicplugins.PluginInfo{
|
||||
Name: "my-plugin",
|
||||
Type: pluginType,
|
||||
Version: fmt.Sprintf("v%d", idx),
|
||||
ConnectionInfo: &dynamicplugins.PluginConnectionInfo{
|
||||
SocketPath: "/var/data/alloc/" + id + "/csi.sock"},
|
||||
AllocID: id,
|
||||
}
|
||||
}
|
||||
|
||||
func setupRegistry() dynamicplugins.Registry {
|
||||
func testManager(t *testing.T, registry dynamicplugins.Registry, resyncPeriod time.Duration) *csiManager {
|
||||
return New(&Config{
|
||||
Logger: testlog.HCLogger(t),
|
||||
DynamicRegistry: registry,
|
||||
UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {},
|
||||
PluginResyncPeriod: resyncPeriod,
|
||||
}).(*csiManager)
|
||||
}
|
||||
|
||||
func setupRegistry(reg *MemDB) dynamicplugins.Registry {
|
||||
return dynamicplugins.NewRegistry(
|
||||
nil,
|
||||
reg,
|
||||
map[string]dynamicplugins.PluginDispenser{
|
||||
"csi-controller": func(*dynamicplugins.PluginInfo) (interface{}, error) {
|
||||
return nil, nil
|
||||
"csi-controller": func(i *dynamicplugins.PluginInfo) (interface{}, error) {
|
||||
return i, nil
|
||||
},
|
||||
"csi-node": func(i *dynamicplugins.PluginInfo) (interface{}, error) {
|
||||
return i, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestManager_Setup_Shutdown(t *testing.T) {
|
||||
r := setupRegistry()
|
||||
defer r.Shutdown()
|
||||
|
||||
cfg := &Config{
|
||||
Logger: testlog.HCLogger(t),
|
||||
DynamicRegistry: r,
|
||||
UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {},
|
||||
}
|
||||
pm := New(cfg).(*csiManager)
|
||||
pm.Run()
|
||||
pm.Shutdown()
|
||||
}
|
||||
|
||||
func TestManager_RegisterPlugin(t *testing.T) {
|
||||
registry := setupRegistry()
|
||||
registry := setupRegistry(nil)
|
||||
defer registry.Shutdown()
|
||||
|
||||
require.NotNil(t, registry)
|
||||
|
||||
cfg := &Config{
|
||||
Logger: testlog.HCLogger(t),
|
||||
DynamicRegistry: registry,
|
||||
UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {},
|
||||
}
|
||||
pm := New(cfg).(*csiManager)
|
||||
pm := testManager(t, registry, time.Hour)
|
||||
defer pm.Shutdown()
|
||||
|
||||
require.NotNil(t, pm.registry)
|
||||
|
||||
err := registry.RegisterPlugin(fakePlugin)
|
||||
require.Nil(t, err)
|
||||
plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController)
|
||||
err := registry.RegisterPlugin(plugin)
|
||||
require.NoError(t, err)
|
||||
|
||||
pm.Run()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
pmap, ok := pm.instances[fakePlugin.Type]
|
||||
pmap, ok := pm.instances[plugin.Type]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok = pmap[fakePlugin.Name]
|
||||
_, ok = pmap[plugin.Name]
|
||||
return ok
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestManager_DeregisterPlugin(t *testing.T) {
|
||||
registry := setupRegistry()
|
||||
registry := setupRegistry(nil)
|
||||
defer registry.Shutdown()
|
||||
|
||||
require.NotNil(t, registry)
|
||||
|
||||
cfg := &Config{
|
||||
Logger: testlog.HCLogger(t),
|
||||
DynamicRegistry: registry,
|
||||
UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {},
|
||||
PluginResyncPeriod: 500 * time.Millisecond,
|
||||
}
|
||||
pm := New(cfg).(*csiManager)
|
||||
pm := testManager(t, registry, 500*time.Millisecond)
|
||||
defer pm.Shutdown()
|
||||
|
||||
require.NotNil(t, pm.registry)
|
||||
|
||||
err := registry.RegisterPlugin(fakePlugin)
|
||||
require.Nil(t, err)
|
||||
plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController)
|
||||
err := registry.RegisterPlugin(plugin)
|
||||
require.NoError(t, err)
|
||||
|
||||
pm.Run()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, ok := pm.instances[fakePlugin.Type][fakePlugin.Name]
|
||||
_, ok := pm.instances[plugin.Type][plugin.Name]
|
||||
return ok
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
|
||||
err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name)
|
||||
require.Nil(t, err)
|
||||
err = registry.DeregisterPlugin(plugin.Type, plugin.Name, "alloc-0")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, ok := pm.instances[fakePlugin.Type][fakePlugin.Name]
|
||||
_, ok := pm.instances[plugin.Type][plugin.Name]
|
||||
return !ok
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
}
|
||||
|
@ -115,47 +102,149 @@ func TestManager_DeregisterPlugin(t *testing.T) {
|
|||
// name but different types (as found with monolith plugins) don't interfere
|
||||
// with each other.
|
||||
func TestManager_MultiplePlugins(t *testing.T) {
|
||||
registry := setupRegistry()
|
||||
registry := setupRegistry(nil)
|
||||
defer registry.Shutdown()
|
||||
|
||||
require.NotNil(t, registry)
|
||||
|
||||
cfg := &Config{
|
||||
Logger: testlog.HCLogger(t),
|
||||
DynamicRegistry: registry,
|
||||
UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {},
|
||||
PluginResyncPeriod: 500 * time.Millisecond,
|
||||
}
|
||||
pm := New(cfg).(*csiManager)
|
||||
pm := testManager(t, registry, 500*time.Millisecond)
|
||||
defer pm.Shutdown()
|
||||
|
||||
require.NotNil(t, pm.registry)
|
||||
controllerPlugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController)
|
||||
err := registry.RegisterPlugin(controllerPlugin)
|
||||
require.NoError(t, err)
|
||||
|
||||
err := registry.RegisterPlugin(fakePlugin)
|
||||
require.Nil(t, err)
|
||||
|
||||
fakeNodePlugin := *fakePlugin
|
||||
fakeNodePlugin.Type = "csi-node"
|
||||
err = registry.RegisterPlugin(&fakeNodePlugin)
|
||||
require.Nil(t, err)
|
||||
nodePlugin := fakePlugin(0, dynamicplugins.PluginTypeCSINode)
|
||||
err = registry.RegisterPlugin(nodePlugin)
|
||||
require.NoError(t, err)
|
||||
|
||||
pm.Run()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, ok := pm.instances[fakePlugin.Type][fakePlugin.Name]
|
||||
_, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name]
|
||||
return ok
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, ok := pm.instances[fakeNodePlugin.Type][fakeNodePlugin.Name]
|
||||
_, ok := pm.instances[nodePlugin.Type][nodePlugin.Name]
|
||||
return ok
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
|
||||
err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name)
|
||||
require.Nil(t, err)
|
||||
err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name, "alloc-0")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, ok := pm.instances[fakePlugin.Type][fakePlugin.Name]
|
||||
_, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name]
|
||||
return !ok
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
}
|
||||
|
||||
// TestManager_ConcurrentPlugins exercises the behavior when multiple
|
||||
// allocations for the same plugin interact
|
||||
func TestManager_ConcurrentPlugins(t *testing.T) {
|
||||
|
||||
t.Run("replacement races on host restart", func(t *testing.T) {
|
||||
plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode)
|
||||
plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode)
|
||||
plugin2 := fakePlugin(2, dynamicplugins.PluginTypeCSINode)
|
||||
|
||||
db := &MemDB{}
|
||||
registry := setupRegistry(db)
|
||||
pm := testManager(t, registry, time.Hour) // no resync except from events
|
||||
pm.Run()
|
||||
|
||||
require.NoError(t, registry.RegisterPlugin(plugin0))
|
||||
require.NoError(t, registry.RegisterPlugin(plugin1))
|
||||
require.Eventuallyf(t, func() bool {
|
||||
im, _ := pm.instances[plugin0.Type][plugin0.Name]
|
||||
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
|
||||
im.allocID == "alloc-1"
|
||||
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin")
|
||||
|
||||
pm.Shutdown()
|
||||
registry.Shutdown()
|
||||
|
||||
// client restarts and we load state from disk.
|
||||
// most recently inserted plugin is current
|
||||
|
||||
registry = setupRegistry(db)
|
||||
defer registry.Shutdown()
|
||||
pm = testManager(t, registry, time.Hour)
|
||||
defer pm.Shutdown()
|
||||
pm.Run()
|
||||
|
||||
require.Eventuallyf(t, func() bool {
|
||||
im, _ := pm.instances[plugin0.Type][plugin0.Name]
|
||||
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
|
||||
im.allocID == "alloc-1"
|
||||
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload")
|
||||
|
||||
// RestoreTask fires for all allocations but none of them are
|
||||
// running because we restarted the whole host. Server gives
|
||||
// us a replacement alloc
|
||||
|
||||
require.NoError(t, registry.RegisterPlugin(plugin2))
|
||||
require.Eventuallyf(t, func() bool {
|
||||
im, _ := pm.instances[plugin0.Type][plugin0.Name]
|
||||
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" &&
|
||||
im.allocID == "alloc-2"
|
||||
}, 5*time.Second, 10*time.Millisecond, "alloc-2 plugin was not active after replacement")
|
||||
|
||||
})
|
||||
|
||||
t.Run("interleaved register and deregister", func(t *testing.T) {
|
||||
plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode)
|
||||
plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode)
|
||||
|
||||
db := &MemDB{}
|
||||
registry := setupRegistry(db)
|
||||
defer registry.Shutdown()
|
||||
|
||||
pm := testManager(t, registry, time.Hour) // no resync except from events
|
||||
defer pm.Shutdown()
|
||||
pm.Run()
|
||||
|
||||
require.NoError(t, registry.RegisterPlugin(plugin0))
|
||||
require.NoError(t, registry.RegisterPlugin(plugin1))
|
||||
|
||||
require.Eventuallyf(t, func() bool {
|
||||
im, _ := pm.instances[plugin0.Type][plugin0.Name]
|
||||
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
|
||||
im.allocID == "alloc-1"
|
||||
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin")
|
||||
|
||||
registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin", "alloc-0")
|
||||
|
||||
require.Eventuallyf(t, func() bool {
|
||||
im, _ := pm.instances[plugin0.Type][plugin0.Name]
|
||||
return im != nil &&
|
||||
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock"
|
||||
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin should still be active plugin")
|
||||
})
|
||||
}
|
||||
|
||||
// MemDB implements a StateDB that stores data in memory and should only be
|
||||
// used for testing. All methods are safe for concurrent use. This is a
|
||||
// partial implementation of the MemDB in the client/state package, copied
|
||||
// here to avoid circular dependencies.
|
||||
type MemDB struct {
|
||||
dynamicManagerPs *dynamicplugins.RegistryState
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (m *MemDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) {
|
||||
if m == nil {
|
||||
return nil, nil
|
||||
}
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.dynamicManagerPs, nil
|
||||
}
|
||||
|
||||
func (m *MemDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.dynamicManagerPs = ps
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
package state
|
||||
|
||||
import "github.com/hashicorp/nomad/client/dynamicplugins"
|
||||
|
||||
// RegistryState12 is the dynamic plugin registry state persisted
|
||||
// before 1.3.0.
|
||||
type RegistryState12 struct {
|
||||
Plugins map[string]map[string]*dynamicplugins.PluginInfo
|
||||
}
|
|
@ -52,7 +52,7 @@ var (
|
|||
// metaVersion is the value of the state schema version to detect when
|
||||
// an upgrade is needed. It skips the usual boltdd/msgpack backend to
|
||||
// be as portable and futureproof as possible.
|
||||
metaVersion = []byte{'2'}
|
||||
metaVersion = []byte{'3'}
|
||||
|
||||
// metaUpgradedKey is the key that stores the timestamp of the last
|
||||
// time the schema was upgraded.
|
||||
|
@ -90,9 +90,9 @@ var (
|
|||
// stored at
|
||||
managerPluginStateKey = []byte("plugin_state")
|
||||
|
||||
// dynamicPluginBucket is the bucket name containing all dynamic plugin
|
||||
// dynamicPluginBucketName is the bucket name containing all dynamic plugin
|
||||
// registry data. each dynamic plugin registry will have its own subbucket.
|
||||
dynamicPluginBucket = []byte("dynamicplugins")
|
||||
dynamicPluginBucketName = []byte("dynamicplugins")
|
||||
|
||||
// registryStateKey is the key at which dynamic plugin registry state is stored
|
||||
registryStateKey = []byte("registry_state")
|
||||
|
@ -677,7 +677,7 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) {
|
|||
func (s *BoltStateDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root dynamic plugin manager bucket
|
||||
dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucket)
|
||||
dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -691,7 +691,7 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS
|
|||
var ps *dynamicplugins.RegistryState
|
||||
|
||||
err := s.db.View(func(tx *boltdd.Tx) error {
|
||||
dynamicBkt := tx.Bucket(dynamicPluginBucket)
|
||||
dynamicBkt := tx.Bucket(dynamicPluginBucketName)
|
||||
if dynamicBkt == nil {
|
||||
// No state, return
|
||||
return nil
|
||||
|
@ -742,11 +742,11 @@ func (s *BoltStateDB) updateWithOptions(opts []WriteOption, updateFn func(tx *bo
|
|||
// 0.9 schema. Creates a backup before upgrading.
|
||||
func (s *BoltStateDB) Upgrade() error {
|
||||
// Check to see if the underlying DB needs upgrading.
|
||||
upgrade, err := NeedsUpgrade(s.db.BoltDB())
|
||||
upgrade09, upgrade13, err := NeedsUpgrade(s.db.BoltDB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !upgrade {
|
||||
if !upgrade09 && !upgrade13 {
|
||||
// No upgrade needed!
|
||||
return nil
|
||||
}
|
||||
|
@ -759,8 +759,16 @@ func (s *BoltStateDB) Upgrade() error {
|
|||
|
||||
// Perform the upgrade
|
||||
if err := s.db.Update(func(tx *boltdd.Tx) error {
|
||||
if err := UpgradeAllocs(s.logger, tx); err != nil {
|
||||
return err
|
||||
|
||||
if upgrade09 {
|
||||
if err := UpgradeAllocs(s.logger, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if upgrade13 {
|
||||
if err := UpgradeDynamicPluginRegistry(s.logger, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Add standard metadata
|
||||
|
@ -773,6 +781,7 @@ func (s *BoltStateDB) Upgrade() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bkt.Put(metaUpgradedKey, time.Now().Format(time.RFC3339))
|
||||
}); err != nil {
|
||||
return err
|
||||
|
|
Binary file not shown.
|
@ -2,21 +2,24 @@ package state
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||
"github.com/hashicorp/nomad/helper/boltdd"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
|
||||
// already up to date.
|
||||
func NeedsUpgrade(bdb *bolt.DB) (bool, error) {
|
||||
needsUpgrade := true
|
||||
err := bdb.View(func(tx *bolt.Tx) error {
|
||||
func NeedsUpgrade(bdb *bolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
|
||||
upgradeTo09 = true
|
||||
upgradeTo13 = true
|
||||
err = bdb.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(metaBucketName)
|
||||
if b == nil {
|
||||
// No meta bucket; upgrade
|
||||
|
@ -29,18 +32,23 @@ func NeedsUpgrade(bdb *bolt.DB) (bool, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if !bytes.Equal(v, metaVersion) {
|
||||
// Version exists but does not match. Abort.
|
||||
return fmt.Errorf("incompatible state version. expected %q but found %q",
|
||||
metaVersion, v)
|
||||
if bytes.Equal(v, []byte{'2'}) {
|
||||
upgradeTo09 = false
|
||||
return nil
|
||||
}
|
||||
if bytes.Equal(v, metaVersion) {
|
||||
upgradeTo09 = false
|
||||
upgradeTo13 = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// Version matches! Assume migrated!
|
||||
needsUpgrade = false
|
||||
return nil
|
||||
// Version exists but does not match. Abort.
|
||||
return fmt.Errorf("incompatible state version. expected %q but found %q",
|
||||
metaVersion, v)
|
||||
|
||||
})
|
||||
|
||||
return needsUpgrade, err
|
||||
return
|
||||
}
|
||||
|
||||
// addMeta adds version metadata to BoltDB to mark it as upgraded and
|
||||
|
@ -51,7 +59,6 @@ func addMeta(tx *bolt.Tx) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bkt.Put(metaVersionKey, metaVersion)
|
||||
}
|
||||
|
||||
|
@ -312,3 +319,32 @@ func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) erro
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpgradeDynamicPluginRegistry(logger hclog.Logger, tx *boltdd.Tx) error {
|
||||
|
||||
dynamicBkt := tx.Bucket(dynamicPluginBucketName)
|
||||
if dynamicBkt == nil {
|
||||
return nil // no previous plugins upgrade
|
||||
}
|
||||
|
||||
oldState := &RegistryState12{}
|
||||
if err := dynamicBkt.Get(registryStateKey, oldState); err != nil {
|
||||
if !boltdd.IsErrNotFound(err) {
|
||||
return fmt.Errorf("failed to read dynamic plugin registry state: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
newState := &dynamicplugins.RegistryState{
|
||||
Plugins: make(map[string]map[string]*list.List),
|
||||
}
|
||||
|
||||
for ptype, plugins := range oldState.Plugins {
|
||||
newState.Plugins[ptype] = make(map[string]*list.List)
|
||||
for pname, pluginInfo := range plugins {
|
||||
newState.Plugins[ptype][pname] = list.New()
|
||||
entry := list.Element{Value: pluginInfo}
|
||||
newState.Plugins[ptype][pname].PushFront(entry)
|
||||
}
|
||||
}
|
||||
return dynamicBkt.Put(registryStateKey, newState)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package state_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -10,6 +11,7 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/nomad/client/allocrunner"
|
||||
"github.com/hashicorp/nomad/client/allocwatcher"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
|
@ -32,40 +34,49 @@ import (
|
|||
func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
files, err := filepath.Glob("testdata/*.db*")
|
||||
require.NoError(t, err)
|
||||
dbFromTestFile := func(t *testing.T, dir, fn string) *BoltStateDB {
|
||||
|
||||
for _, fn := range files {
|
||||
var src io.ReadCloser
|
||||
src, err := os.Open(fn)
|
||||
require.NoError(t, err)
|
||||
defer src.Close()
|
||||
|
||||
// testdata may be gzip'd; decode on copy
|
||||
if strings.HasSuffix(fn, ".gz") {
|
||||
src, err = gzip.NewReader(src)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
dst, err := os.Create(filepath.Join(dir, "state.db"))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Copy test files before testing them for safety
|
||||
_, err = io.Copy(dst, src)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, src.Close())
|
||||
|
||||
dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
db := dbI.(*BoltStateDB)
|
||||
return db
|
||||
}
|
||||
|
||||
pre09files := []string{
|
||||
"testdata/state-0.7.1.db.gz",
|
||||
"testdata/state-0.8.6-empty.db.gz",
|
||||
"testdata/state-0.8.6-no-deploy.db.gz"}
|
||||
|
||||
for _, fn := range pre09files {
|
||||
t.Run(fn, func(t *testing.T) {
|
||||
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
var src io.ReadCloser
|
||||
src, err = os.Open(fn)
|
||||
require.NoError(t, err)
|
||||
defer src.Close()
|
||||
|
||||
// testdata may be gzip'd; decode on copy
|
||||
if strings.HasSuffix(fn, ".gz") {
|
||||
src, err = gzip.NewReader(src)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
dst, err := os.Create(filepath.Join(dir, "state.db"))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Copy test files before testing them for safety
|
||||
_, err = io.Copy(dst, src)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, src.Close())
|
||||
|
||||
dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir)
|
||||
require.NoError(t, err)
|
||||
defer dbI.Close()
|
||||
|
||||
db := dbI.(*BoltStateDB)
|
||||
db := dbFromTestFile(t, dir, fn)
|
||||
defer db.Close()
|
||||
|
||||
// Simply opening old files should *not* alter them
|
||||
require.NoError(t, db.DB().View(func(tx *boltdd.Tx) error {
|
||||
|
@ -76,16 +87,18 @@ func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
|
|||
return nil
|
||||
}))
|
||||
|
||||
needsUpgrade, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
to09, to12, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.True(t, needsUpgrade)
|
||||
require.True(t, to09)
|
||||
require.True(t, to12)
|
||||
|
||||
// Attept the upgrade
|
||||
// Attempt the upgrade
|
||||
require.NoError(t, db.Upgrade())
|
||||
|
||||
needsUpgrade, err = NeedsUpgrade(db.DB().BoltDB())
|
||||
to09, to12, err = NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.False(t, needsUpgrade)
|
||||
require.False(t, to09)
|
||||
require.False(t, to12)
|
||||
|
||||
// Ensure Allocations can be restored and
|
||||
// NewAR/AR.Restore do not error.
|
||||
|
@ -109,9 +122,59 @@ func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, db.PutDevicePluginState(ps))
|
||||
|
||||
registry, err := db.GetDynamicPluginRegistryState()
|
||||
require.Nil(t, registry)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("testdata/state-1.2.6.db.gz", func(t *testing.T) {
|
||||
fn := "testdata/state-1.2.6.db.gz"
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db := dbFromTestFile(t, dir, fn)
|
||||
defer db.Close()
|
||||
|
||||
// Simply opening old files should *not* alter them
|
||||
db.DB().BoltDB().View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("meta"))
|
||||
if b == nil {
|
||||
return fmt.Errorf("meta bucket should exist")
|
||||
}
|
||||
v := b.Get([]byte("version"))
|
||||
if len(v) == 0 {
|
||||
return fmt.Errorf("meta version is missing")
|
||||
}
|
||||
if !bytes.Equal(v, []byte{'2'}) {
|
||||
return fmt.Errorf("meta version should not have changed")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
to09, to12, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.False(t, to09)
|
||||
require.True(t, to12)
|
||||
|
||||
// Attempt the upgrade
|
||||
require.NoError(t, db.Upgrade())
|
||||
|
||||
to09, to12, err = NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.False(t, to09)
|
||||
require.False(t, to12)
|
||||
|
||||
registry, err := db.GetDynamicPluginRegistryState()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, registry)
|
||||
require.Len(t, registry.Plugins["csi-node"], 2)
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
}
|
||||
|
||||
// checkUpgradedAlloc creates and restores an AllocRunner from an upgraded
|
||||
|
|
|
@ -38,9 +38,10 @@ func TestUpgrade_NeedsUpgrade_New(t *testing.T) {
|
|||
db, cleanup := setupBoltStateDB(t)
|
||||
defer cleanup()
|
||||
|
||||
up, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
to09, to12, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.False(t, up)
|
||||
require.False(t, to09)
|
||||
require.False(t, to12)
|
||||
}
|
||||
|
||||
// TestUpgrade_NeedsUpgrade_Old asserts state dbs with just the alloctions
|
||||
|
@ -58,16 +59,18 @@ func TestUpgrade_NeedsUpgrade_Old(t *testing.T) {
|
|||
return err
|
||||
}))
|
||||
|
||||
up, err := NeedsUpgrade(db)
|
||||
to09, to12, err := NeedsUpgrade(db)
|
||||
require.NoError(t, err)
|
||||
require.True(t, up)
|
||||
require.True(t, to09)
|
||||
require.True(t, to12)
|
||||
|
||||
// Adding meta should mark it as upgraded
|
||||
require.NoError(t, db.Update(addMeta))
|
||||
|
||||
up, err = NeedsUpgrade(db)
|
||||
to09, to12, err = NeedsUpgrade(db)
|
||||
require.NoError(t, err)
|
||||
require.False(t, up)
|
||||
require.False(t, to09)
|
||||
require.False(t, to12)
|
||||
}
|
||||
|
||||
// TestUpgrade_NeedsUpgrade_Error asserts that an error is returned from
|
||||
|
@ -79,7 +82,7 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) {
|
|||
cases := [][]byte{
|
||||
{'"', '2', '"'}, // wrong type
|
||||
{'1'}, // wrong version (never existed)
|
||||
{'3'}, // wrong version (future)
|
||||
{'4'}, // wrong version (future)
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
|
@ -95,7 +98,7 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) {
|
|||
return bkt.Put(metaVersionKey, tc)
|
||||
}))
|
||||
|
||||
_, err := NeedsUpgrade(db)
|
||||
_, _, err := NeedsUpgrade(db)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -25,6 +25,32 @@ In Nomad 1.3.0, the default raft protocol version has been updated to
|
|||
server will automatically upgrade that server's raft protocol. See the
|
||||
[Upgrading to Raft Protocol 3] guide.
|
||||
|
||||
#### Client State Store
|
||||
|
||||
The client state store will be automatically migrated to a new schema
|
||||
version when upgrading a client.
|
||||
|
||||
Downgrading to a previous version of the client after upgrading it to
|
||||
Nomad 1.3 is not supported. To downgrade safely, users should drain
|
||||
all tasks from the Nomad client and erase its data directory.
|
||||
|
||||
#### CSI Plugins
|
||||
|
||||
The client filesystem layout for CSI plugins has been updated to
|
||||
correctly handle the lifecycle of multiple allocations serving the
|
||||
same plugin. Running plugin tasks will not be updated after upgrading
|
||||
the client, but it is recommended to redeploy CSI plugin jobs after
|
||||
upgrading the cluster.
|
||||
|
||||
The directory for plugin control sockets will be mounted from a new
|
||||
per-allocation directory in the client data dir. This will still be
|
||||
bind-mounted to `csi_plugin.mount_config` as in versions of Nomad
|
||||
prior to 1.3.0.
|
||||
|
||||
The volume staging directory for new CSI plugin tasks will now be
|
||||
mounted to the task's `NOMAD_TASK_DIR` instead of the
|
||||
`csi_plugin.mount_config`.
|
||||
|
||||
## Nomad 1.2.6, 1.1.12, and 1.0.18
|
||||
|
||||
#### ACL requirement for the job parse endpoint
|
||||
|
|
Loading…
Reference in New Issue