From 246db87a748e2a8a40c56db93e77b84defdb234d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 23 Feb 2022 15:23:07 -0500 Subject: [PATCH] CSI: allow for concurrent plugin allocations (#12078) The dynamic plugin registry assumes that plugins are singletons, which matches the behavior of other Nomad plugins. But because dynamic plugins like CSI are implemented by allocations, we need to handle the possibility of multiple allocations for a given plugin type + ID, as well as behaviors around interleaved allocation starts and stops. Update the data structure for the dynamic registry so that more recent allocations take over as the instance manager singleton, but we still preserve the previous running allocations so that restores work without racing. Multiple allocations can run on a client for the same plugin, even if only during updates. Provide each plugin task a unique path for the control socket so that the tasks don't interfere with each other. --- .changelog/12078.txt | 11 + .../taskrunner/plugin_supervisor_hook.go | 100 +++++-- client/dynamicplugins/registry.go | 96 +++++-- client/dynamicplugins/registry_test.go | 126 ++++++++- client/pluginmanager/csimanager/manager.go | 22 +- .../pluginmanager/csimanager/manager_test.go | 249 ++++++++++++------ client/state/12types.go | 9 + client/state/state_database.go | 27 +- client/state/testdata/state-1.2.6.db.gz | 3 + client/state/upgrade.go | 60 ++++- client/state/upgrade_int_test.go | 129 ++++++--- client/state/upgrade_test.go | 19 +- .../content/docs/upgrade/upgrade-specific.mdx | 26 ++ 13 files changed, 680 insertions(+), 197 deletions(-) create mode 100644 .changelog/12078.txt create mode 100644 client/state/12types.go create mode 100644 client/state/testdata/state-1.2.6.db.gz diff --git a/.changelog/12078.txt b/.changelog/12078.txt new file mode 100644 index 000000000..96e335d94 --- /dev/null +++ b/.changelog/12078.txt @@ -0,0 +1,11 @@ +```release-note:improvement +csi: Allow for concurrent plugin allocations +``` + +```release-note:breaking-change +client: The client state store will be automatically migrated to a new schema version when upgrading a client. Downgrading to a previous version of the client after upgrading it to Nomad 1.3 is not supported. To downgrade safely, users should erase the Nomad client's data directory. +``` + +```release-note:breaking-change +csi: The client filesystem layout for CSI plugins has been updated to correctly handle the lifecycle of multiple allocations serving the same plugin. Running plugin tasks will not be updated after upgrading the client, but it is recommended to redeploy CSI plugin jobs after upgrading the cluster. +``` diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 679fb2f73..94909d3ce 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -21,18 +21,20 @@ import ( // tasks. These plugins will be fingerprinted and it will manage connecting them // to their requisite plugin manager. // -// It provides a couple of things to a task running inside Nomad. These are: -// * A mount to the `plugin_mount_dir`, that will then be used by Nomad -// to connect to the nested plugin and handle volume mounts. +// It provides a few things to a plugin task running inside Nomad. These are: +// * A mount to the `csi_plugin.mount_dir` where the plugin will create its csi.sock +// * A mount to `local/csi` that node plugins will use to stage volume mounts. // * When the task has started, it starts a loop of attempting to connect to the // plugin, to perform initial fingerprinting of the plugins capabilities before // notifying the plugin manager of the plugin. type csiPluginSupervisorHook struct { - logger hclog.Logger - alloc *structs.Allocation - task *structs.Task - runner *TaskRunner - mountPoint string + logger hclog.Logger + alloc *structs.Allocation + task *structs.Task + runner *TaskRunner + mountPoint string + socketMountPoint string + socketPath string caps *drivers.Capabilities @@ -73,20 +75,36 @@ var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{} // with the catalog and to ensure any mounts are cleaned up. var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{} +// This hook creates a csi/ directory within the client's datadir used to +// manage plugins and mount points volumes. The layout is as follows: + +// plugins/ +// {alloc-id}/csi.sock +// Per-allocation directories of unix domain sockets used to communicate +// with the CSI plugin. Nomad creates the directory and the plugin creates +// the socket file. This directory is bind-mounted to the +// csi_plugin.mount_config dir in the plugin task. +// +// {plugin-type}/{plugin-id}/ +// staging/ +// {volume-id}/{usage-mode}/ +// Intermediate mount point used by node plugins that support +// NODE_STAGE_UNSTAGE capability. +// +// per-alloc/ +// {alloc-id}/{volume-id}/{usage-mode}/ +// Mount point bound from the staging directory into tasks that use +// the mounted volumes + func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPluginSupervisorHook { task := config.runner.Task() - // The Plugin directory will look something like this: - // . - // .. - // csi.sock - A unix domain socket used to communicate with the CSI Plugin - // staging/ - // {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities. - // per-alloc/ - // {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume pluginRoot := filepath.Join(config.clientStateDirPath, "csi", string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID) + socketMountPoint := filepath.Join(config.clientStateDirPath, "csi", + "plugins", config.runner.Alloc().ID) + shutdownCtx, cancelFn := context.WithCancel(context.Background()) hook := &csiPluginSupervisorHook{ @@ -96,6 +114,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi logger: config.logger, task: task, mountPoint: pluginRoot, + socketMountPoint: socketMountPoint, caps: config.capabilities, shutdownCtx: shutdownCtx, shutdownCancelFn: cancelFn, @@ -122,18 +141,46 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context, return fmt.Errorf("failed to create mount point: %v", err) } + if err := os.MkdirAll(h.socketMountPoint, 0700); err != nil && !os.IsExist(err) { + return fmt.Errorf("failed to create socket mount point: %v", err) + } + + // where the socket will be mounted configMount := &drivers.MountConfig{ TaskPath: h.task.CSIPluginConfig.MountDir, + HostPath: h.socketMountPoint, + Readonly: false, + PropagationMode: "bidirectional", + } + // where the staging and per-alloc directories will be mounted + volumeStagingMounts := &drivers.MountConfig{ + // TODO(tgross): add this TaskPath to the CSIPluginConfig as well + TaskPath: "/local/csi", HostPath: h.mountPoint, Readonly: false, PropagationMode: "bidirectional", } + // devices from the host devMount := &drivers.MountConfig{ TaskPath: "/dev", HostPath: "/dev", Readonly: false, } + // TODO(tgross): https://github.com/hashicorp/nomad/issues/11786 + // If we're already registered, we should be able to update the + // definition in the update hook + + // For backwards compatibility, ensure that we don't overwrite the + // socketPath on client restart with existing plugin allocations. + pluginInfo, _ := h.runner.dynamicRegistry.PluginForAlloc( + string(h.task.CSIPluginConfig.Type), h.task.CSIPluginConfig.ID, h.alloc.ID) + if pluginInfo != nil { + h.socketPath = pluginInfo.ConnectionInfo.SocketPath + } else { + h.socketPath = filepath.Join(h.socketMountPoint, structs.CSISocketName) + } + switch h.caps.FSIsolation { case drivers.FSIsolationNone: // Plugin tasks with no filesystem isolation won't have the @@ -142,13 +189,15 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context, // plugins will need to be aware of the csi directory layout // in the client data dir resp.Env = map[string]string{ - "CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")} + "CSI_ENDPOINT": h.socketPath} default: resp.Env = map[string]string{ - "CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")} + "CSI_ENDPOINT": filepath.Join( + h.task.CSIPluginConfig.MountDir, structs.CSISocketName)} } mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount) + mounts = ensureMountpointInserted(mounts, volumeStagingMounts) mounts = ensureMountpointInserted(mounts, devMount) h.runner.hookResources.setMounts(mounts) @@ -203,9 +252,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) { h.runningLock.Unlock() }() - socketPath := filepath.Join(h.mountPoint, structs.CSISocketName) - - client := csi.NewClient(socketPath, h.logger.Named("csi_client").With( + client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With( "plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type)) defer client.Close() @@ -249,7 +296,7 @@ WAITFORREADY: } // Step 2: Register the plugin with the catalog. - deregisterPluginFn, err := h.registerPlugin(client, socketPath) + deregisterPluginFn, err := h.registerPlugin(client, h.socketPath) if err != nil { h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err)) return @@ -317,7 +364,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat Options: map[string]string{ "Provider": info.Name, // vendor name "MountPoint": h.mountPoint, - "ContainerMountPoint": h.task.CSIPluginConfig.MountDir, + "ContainerMountPoint": "/local/csi", }, } } @@ -348,8 +395,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat // closes over its own registration rname := reg.Name rtype := reg.Type + allocID := reg.AllocID deregistrationFns = append(deregistrationFns, func() { - err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname) + err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID) if err != nil { h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err) } @@ -384,6 +432,10 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client // Stop hooks must be idempotent. The context is cancelled prematurely if the // task is killed. func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error { + err := os.RemoveAll(h.socketMountPoint) + if err != nil { + h.logger.Error("could not remove plugin socket directory", "dir", h.socketMountPoint, "error", err) + } h.shutdownCancelFn() return nil } diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index ed1710ee5..65f8c355c 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -4,6 +4,7 @@ package dynamicplugins import ( + "container/list" "context" "errors" "fmt" @@ -19,10 +20,11 @@ const ( // that are running as Nomad Tasks. type Registry interface { RegisterPlugin(info *PluginInfo) error - DeregisterPlugin(ptype, name string) error + DeregisterPlugin(ptype, name, allocID string) error ListPlugins(ptype string) []*PluginInfo DispensePlugin(ptype, name string) (interface{}, error) + PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent @@ -31,10 +33,11 @@ type Registry interface { StubDispenserForType(ptype string, dispenser PluginDispenser) } -// RegistryState is what we persist in the client state store. It contains -// a map of plugin types to maps of plugin name -> PluginInfo. +// RegistryState is what we persist in the client state +// store. It contains a map of plugin types to maps of plugin name -> +// list of *PluginInfo, sorted by recency of registration type RegistryState struct { - Plugins map[string]map[string]*PluginInfo + Plugins map[string]map[string]*list.List } type PluginDispenser func(info *PluginInfo) (interface{}, error) @@ -44,7 +47,7 @@ type PluginDispenser func(info *PluginInfo) (interface{}, error) func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry { registry := &dynamicRegistry{ - plugins: make(map[string]map[string]*PluginInfo), + plugins: make(map[string]map[string]*list.List), broadcasters: make(map[string]*pluginEventBroadcaster), dispensers: dispensers, state: state, @@ -122,7 +125,7 @@ type PluginUpdateEvent struct { } type dynamicRegistry struct { - plugins map[string]map[string]*PluginInfo + plugins map[string]map[string]*list.List pluginsLock sync.RWMutex broadcasters map[string]*pluginEventBroadcaster @@ -180,18 +183,35 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error { pmap, ok := d.plugins[info.Type] if !ok { - pmap = make(map[string]*PluginInfo, 1) + pmap = make(map[string]*list.List) d.plugins[info.Type] = pmap } - - pmap[info.Name] = info - - broadcaster := d.broadcasterForPluginType(info.Type) - event := &PluginUpdateEvent{ - EventType: EventTypeRegistered, - Info: info, + infos, ok := pmap[info.Name] + if !ok { + infos = list.New() + pmap[info.Name] = infos + } + + // TODO(tgross): https://github.com/hashicorp/nomad/issues/11786 + // If we're already registered, we should update the definition + // and send a broadcast of any update so the instanceManager can + // be restarted if there's been a change + var alreadyRegistered bool + for e := infos.Front(); e != nil; e = e.Next() { + if e.Value.(*PluginInfo).AllocID == info.AllocID { + alreadyRegistered = true + break + } + } + if !alreadyRegistered { + infos.PushFront(info) + broadcaster := d.broadcasterForPluginType(info.Type) + event := &PluginUpdateEvent{ + EventType: EventTypeRegistered, + Info: info, + } + broadcaster.broadcast(event) } - broadcaster.broadcast(event) return d.sync() } @@ -209,7 +229,7 @@ func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBro return broadcaster } -func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error { +func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error { d.pluginsLock.Lock() defer d.pluginsLock.Unlock() @@ -223,6 +243,9 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error { // developers during the development of new plugin types. return errors.New("must specify plugin name to deregister") } + if allocID == "" { + return errors.New("must specify plugin allocation ID to deregister") + } pmap, ok := d.plugins[ptype] if !ok { @@ -230,12 +253,20 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error { return fmt.Errorf("no plugins registered for type: %s", ptype) } - info, ok := pmap[name] + infos, ok := pmap[name] if !ok { // plugin already deregistered, don't send events or try re-deleting. return nil } - delete(pmap, name) + + var info *PluginInfo + for e := infos.Front(); e != nil; e = e.Next() { + info = e.Value.(*PluginInfo) + if info.AllocID == allocID { + infos.Remove(e) + break + } + } broadcaster := d.broadcasterForPluginType(ptype) event := &PluginUpdateEvent{ @@ -259,7 +290,9 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo { plugins := make([]*PluginInfo, 0, len(pmap)) for _, info := range pmap { - plugins = append(plugins, info) + if info.Front() != nil { + plugins = append(plugins, info.Front().Value.(*PluginInfo)) + } } return plugins @@ -302,11 +335,32 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{} } info, ok := pmap[name] - if !ok { + if !ok || info.Front() == nil { return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype) } - return dispenseFunc(info) + return dispenseFunc(info.Front().Value.(*PluginInfo)) +} + +func (d *dynamicRegistry) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) { + d.pluginsLock.Lock() + defer d.pluginsLock.Unlock() + + pmap, ok := d.plugins[ptype] + if !ok { + return nil, fmt.Errorf("no plugins registered for type: %s", ptype) + } + + infos, ok := pmap[name] + if ok { + for e := infos.Front(); e != nil; e = e.Next() { + plugin := e.Value.(*PluginInfo) + if plugin.AllocID == allocID { + return plugin, nil + } + } + } + return nil, fmt.Errorf("no plugin for that allocation") } // PluginsUpdatedCh returns a channel over which plugin events for the requested diff --git a/client/dynamicplugins/registry_test.go b/client/dynamicplugins/registry_test.go index a2621c05f..a820a675f 100644 --- a/client/dynamicplugins/registry_test.go +++ b/client/dynamicplugins/registry_test.go @@ -2,6 +2,7 @@ package dynamicplugins import ( "context" + "fmt" "sync" "testing" "time" @@ -132,11 +133,12 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) { err := r.RegisterPlugin(&PluginInfo{ Type: "csi", Name: "my-plugin", + AllocID: "alloc-0", ConnectionInfo: &PluginConnectionInfo{}, }) require.NoError(t, err) - err = r.DeregisterPlugin("csi", "my-plugin") + err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0") require.NoError(t, err) require.Eventually(t, func() bool { @@ -178,6 +180,7 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) { err := r.RegisterPlugin(&PluginInfo{ Type: PluginTypeCSIController, Name: "my-plugin", + AllocID: "alloc-0", ConnectionInfo: &PluginConnectionInfo{}, }) require.NoError(t, err) @@ -185,14 +188,15 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) { err = r.RegisterPlugin(&PluginInfo{ Type: PluginTypeCSINode, Name: "my-plugin", + AllocID: "alloc-1", ConnectionInfo: &PluginConnectionInfo{}, }) require.NoError(t, err) - err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin") + err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin", "alloc-0") require.NoError(t, err) - require.Equal(t, len(r.ListPlugins(PluginTypeCSINode)), 1) - require.Equal(t, len(r.ListPlugins(PluginTypeCSIController)), 0) + require.Equal(t, 1, len(r.ListPlugins(PluginTypeCSINode))) + require.Equal(t, 0, len(r.ListPlugins(PluginTypeCSIController))) } func TestDynamicRegistry_StateStore(t *testing.T) { @@ -221,6 +225,120 @@ func TestDynamicRegistry_StateStore(t *testing.T) { require.NoError(t, err) } +func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) { + + t.Parallel() + dispenseFn := func(i *PluginInfo) (interface{}, error) { + return i, nil + } + + newPlugin := func(idx int) *PluginInfo { + id := fmt.Sprintf("alloc-%d", idx) + return &PluginInfo{ + Name: "my-plugin", + Type: PluginTypeCSINode, + Version: fmt.Sprintf("v%d", idx), + ConnectionInfo: &PluginConnectionInfo{ + SocketPath: "/var/data/alloc/" + id + "/csi.sock"}, + AllocID: id, + } + } + + dispensePlugin := func(t *testing.T, reg Registry) *PluginInfo { + result, err := reg.DispensePlugin(PluginTypeCSINode, "my-plugin") + require.NotNil(t, result) + require.NoError(t, err) + plugin := result.(*PluginInfo) + return plugin + } + + t.Run("restore races on client restart", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + + memdb := &MemDB{} + oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + // add a plugin and a new alloc running the same plugin + // (without stopping the old one) + require.NoError(t, oldR.RegisterPlugin(plugin0)) + require.NoError(t, oldR.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, oldR) + require.Equal(t, "alloc-1", plugin.AllocID) + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + plugin = dispensePlugin(t, oldR) + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + + // RestoreTask fires for all allocations, which runs the + // plugin_supervisor_hook. But there's a race and the allocations + // in this scenario are Restored in the opposite order they were + // created + + require.NoError(t, newR.RegisterPlugin(plugin0)) + plugin = dispensePlugin(t, newR) + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + }) + + t.Run("replacement races on host restart", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + plugin2 := newPlugin(2) + + memdb := &MemDB{} + oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + // add a plugin and a new alloc running the same plugin + // (without stopping the old one) + require.NoError(t, oldR.RegisterPlugin(plugin0)) + require.NoError(t, oldR.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, oldR) + require.Equal(t, "alloc-1", plugin.AllocID) + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + plugin = dispensePlugin(t, oldR) + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + + // RestoreTask fires for all allocations but none of them are + // running because we restarted the whole host. Server gives + // us a replacement alloc + + require.NoError(t, newR.RegisterPlugin(plugin2)) + plugin = dispensePlugin(t, newR) + require.Equal(t, "/var/data/alloc/alloc-2/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-2", plugin.AllocID) + }) + + t.Run("interleaved register and deregister", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + + memdb := &MemDB{} + reg := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + require.NoError(t, reg.RegisterPlugin(plugin0)) + + // replacement is registered before old plugin deregisters + require.NoError(t, reg.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, reg) + require.Equal(t, "alloc-1", plugin.AllocID) + + reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin", "alloc-0") + plugin = dispensePlugin(t, reg) + require.Equal(t, "alloc-1", plugin.AllocID) + }) + +} + // MemDB implements a StateDB that stores data in memory and should only be // used for testing. All methods are safe for concurrent use. This is a // partial implementation of the MemDB in the client/state package, copied diff --git a/client/pluginmanager/csimanager/manager.go b/client/pluginmanager/csimanager/manager.go index 11dc66b4c..d2b5bf78d 100644 --- a/client/pluginmanager/csimanager/manager.go +++ b/client/pluginmanager/csimanager/manager.go @@ -55,7 +55,7 @@ func New(config *Config) Manager { type csiManager struct { // instances should only be accessed from the run() goroutine and the shutdown - // fn. It is a map of PluginType : [PluginName : instanceManager] + // fn. It is a map of PluginType : [PluginName : *instanceManager] instances map[string]map[string]*instanceManager registry dynamicplugins.Registry @@ -167,11 +167,19 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) { name := plugin.Name ptype := plugin.Type instances := c.instancesForType(ptype) - if _, ok := instances[name]; !ok { - c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype) + mgr, ok := instances[name] + if !ok { + c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID) mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin) instances[name] = mgr mgr.run() + } else if mgr.allocID != plugin.AllocID { + mgr.shutdown() + c.logger.Debug("detected update for CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID) + mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin) + instances[name] = mgr + mgr.run() + } } @@ -182,9 +190,11 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) { ptype := plugin.Type instances := c.instancesForType(ptype) if mgr, ok := instances[name]; ok { - c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype) - mgr.shutdown() - delete(instances, name) + if mgr.allocID == plugin.AllocID { + c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID) + mgr.shutdown() + delete(instances, name) + } } } diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index f6c3f381d..6ece1df46 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -1,6 +1,8 @@ package csimanager import ( + "fmt" + "sync" "testing" "time" @@ -13,100 +15,85 @@ import ( var _ pluginmanager.PluginManager = (*csiManager)(nil) -var fakePlugin = &dynamicplugins.PluginInfo{ - Name: "my-plugin", - Type: "csi-controller", - ConnectionInfo: &dynamicplugins.PluginConnectionInfo{}, +func fakePlugin(idx int, pluginType string) *dynamicplugins.PluginInfo { + id := fmt.Sprintf("alloc-%d", idx) + return &dynamicplugins.PluginInfo{ + Name: "my-plugin", + Type: pluginType, + Version: fmt.Sprintf("v%d", idx), + ConnectionInfo: &dynamicplugins.PluginConnectionInfo{ + SocketPath: "/var/data/alloc/" + id + "/csi.sock"}, + AllocID: id, + } } -func setupRegistry() dynamicplugins.Registry { +func testManager(t *testing.T, registry dynamicplugins.Registry, resyncPeriod time.Duration) *csiManager { + return New(&Config{ + Logger: testlog.HCLogger(t), + DynamicRegistry: registry, + UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, + PluginResyncPeriod: resyncPeriod, + }).(*csiManager) +} + +func setupRegistry(reg *MemDB) dynamicplugins.Registry { return dynamicplugins.NewRegistry( - nil, + reg, map[string]dynamicplugins.PluginDispenser{ - "csi-controller": func(*dynamicplugins.PluginInfo) (interface{}, error) { - return nil, nil + "csi-controller": func(i *dynamicplugins.PluginInfo) (interface{}, error) { + return i, nil + }, + "csi-node": func(i *dynamicplugins.PluginInfo) (interface{}, error) { + return i, nil }, }) } -func TestManager_Setup_Shutdown(t *testing.T) { - r := setupRegistry() - defer r.Shutdown() - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: r, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - } - pm := New(cfg).(*csiManager) - pm.Run() - pm.Shutdown() -} - func TestManager_RegisterPlugin(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - - require.NotNil(t, registry) - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: registry, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - } - pm := New(cfg).(*csiManager) + pm := testManager(t, registry, time.Hour) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(plugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - pmap, ok := pm.instances[fakePlugin.Type] + pmap, ok := pm.instances[plugin.Type] if !ok { return false } - _, ok = pmap[fakePlugin.Name] + _, ok = pmap[plugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) } func TestManager_DeregisterPlugin(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - - require.NotNil(t, registry) - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: registry, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - PluginResyncPeriod: 500 * time.Millisecond, - } - pm := New(cfg).(*csiManager) + pm := testManager(t, registry, 500*time.Millisecond) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(plugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[plugin.Type][plugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) - require.Nil(t, err) + err = registry.DeregisterPlugin(plugin.Type, plugin.Name, "alloc-0") + require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[plugin.Type][plugin.Name] return !ok }, 5*time.Second, 10*time.Millisecond) } @@ -115,47 +102,149 @@ func TestManager_DeregisterPlugin(t *testing.T) { // name but different types (as found with monolith plugins) don't interfere // with each other. func TestManager_MultiplePlugins(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - require.NotNil(t, registry) - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: registry, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - PluginResyncPeriod: 500 * time.Millisecond, - } - pm := New(cfg).(*csiManager) + pm := testManager(t, registry, 500*time.Millisecond) defer pm.Shutdown() - require.NotNil(t, pm.registry) + controllerPlugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(controllerPlugin) + require.NoError(t, err) - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) - - fakeNodePlugin := *fakePlugin - fakeNodePlugin.Type = "csi-node" - err = registry.RegisterPlugin(&fakeNodePlugin) - require.Nil(t, err) + nodePlugin := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + err = registry.RegisterPlugin(nodePlugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { - _, ok := pm.instances[fakeNodePlugin.Type][fakeNodePlugin.Name] + _, ok := pm.instances[nodePlugin.Type][nodePlugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) - require.Nil(t, err) + err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name, "alloc-0") + require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] return !ok }, 5*time.Second, 10*time.Millisecond) } + +// TestManager_ConcurrentPlugins exercises the behavior when multiple +// allocations for the same plugin interact +func TestManager_ConcurrentPlugins(t *testing.T) { + + t.Run("replacement races on host restart", func(t *testing.T) { + plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode) + plugin2 := fakePlugin(2, dynamicplugins.PluginTypeCSINode) + + db := &MemDB{} + registry := setupRegistry(db) + pm := testManager(t, registry, time.Hour) // no resync except from events + pm.Run() + + require.NoError(t, registry.RegisterPlugin(plugin0)) + require.NoError(t, registry.RegisterPlugin(plugin1)) + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") + + pm.Shutdown() + registry.Shutdown() + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + registry = setupRegistry(db) + defer registry.Shutdown() + pm = testManager(t, registry, time.Hour) + defer pm.Shutdown() + pm.Run() + + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload") + + // RestoreTask fires for all allocations but none of them are + // running because we restarted the whole host. Server gives + // us a replacement alloc + + require.NoError(t, registry.RegisterPlugin(plugin2)) + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" && + im.allocID == "alloc-2" + }, 5*time.Second, 10*time.Millisecond, "alloc-2 plugin was not active after replacement") + + }) + + t.Run("interleaved register and deregister", func(t *testing.T) { + plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode) + + db := &MemDB{} + registry := setupRegistry(db) + defer registry.Shutdown() + + pm := testManager(t, registry, time.Hour) // no resync except from events + defer pm.Shutdown() + pm.Run() + + require.NoError(t, registry.RegisterPlugin(plugin0)) + require.NoError(t, registry.RegisterPlugin(plugin1)) + + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") + + registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin", "alloc-0") + + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im != nil && + im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin should still be active plugin") + }) +} + +// MemDB implements a StateDB that stores data in memory and should only be +// used for testing. All methods are safe for concurrent use. This is a +// partial implementation of the MemDB in the client/state package, copied +// here to avoid circular dependencies. +type MemDB struct { + dynamicManagerPs *dynamicplugins.RegistryState + mu sync.RWMutex +} + +func (m *MemDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + if m == nil { + return nil, nil + } + m.mu.Lock() + defer m.mu.Unlock() + return m.dynamicManagerPs, nil +} + +func (m *MemDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { + if m == nil { + return nil + } + m.mu.Lock() + defer m.mu.Unlock() + m.dynamicManagerPs = ps + return nil +} diff --git a/client/state/12types.go b/client/state/12types.go new file mode 100644 index 000000000..bc93c258e --- /dev/null +++ b/client/state/12types.go @@ -0,0 +1,9 @@ +package state + +import "github.com/hashicorp/nomad/client/dynamicplugins" + +// RegistryState12 is the dynamic plugin registry state persisted +// before 1.3.0. +type RegistryState12 struct { + Plugins map[string]map[string]*dynamicplugins.PluginInfo +} diff --git a/client/state/state_database.go b/client/state/state_database.go index 97a51e96b..5a7fa6749 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -52,7 +52,7 @@ var ( // metaVersion is the value of the state schema version to detect when // an upgrade is needed. It skips the usual boltdd/msgpack backend to // be as portable and futureproof as possible. - metaVersion = []byte{'2'} + metaVersion = []byte{'3'} // metaUpgradedKey is the key that stores the timestamp of the last // time the schema was upgraded. @@ -90,9 +90,9 @@ var ( // stored at managerPluginStateKey = []byte("plugin_state") - // dynamicPluginBucket is the bucket name containing all dynamic plugin + // dynamicPluginBucketName is the bucket name containing all dynamic plugin // registry data. each dynamic plugin registry will have its own subbucket. - dynamicPluginBucket = []byte("dynamicplugins") + dynamicPluginBucketName = []byte("dynamicplugins") // registryStateKey is the key at which dynamic plugin registry state is stored registryStateKey = []byte("registry_state") @@ -677,7 +677,7 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) { func (s *BoltStateDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root dynamic plugin manager bucket - dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucket) + dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucketName) if err != nil { return err } @@ -691,7 +691,7 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS var ps *dynamicplugins.RegistryState err := s.db.View(func(tx *boltdd.Tx) error { - dynamicBkt := tx.Bucket(dynamicPluginBucket) + dynamicBkt := tx.Bucket(dynamicPluginBucketName) if dynamicBkt == nil { // No state, return return nil @@ -742,11 +742,11 @@ func (s *BoltStateDB) updateWithOptions(opts []WriteOption, updateFn func(tx *bo // 0.9 schema. Creates a backup before upgrading. func (s *BoltStateDB) Upgrade() error { // Check to see if the underlying DB needs upgrading. - upgrade, err := NeedsUpgrade(s.db.BoltDB()) + upgrade09, upgrade13, err := NeedsUpgrade(s.db.BoltDB()) if err != nil { return err } - if !upgrade { + if !upgrade09 && !upgrade13 { // No upgrade needed! return nil } @@ -759,8 +759,16 @@ func (s *BoltStateDB) Upgrade() error { // Perform the upgrade if err := s.db.Update(func(tx *boltdd.Tx) error { - if err := UpgradeAllocs(s.logger, tx); err != nil { - return err + + if upgrade09 { + if err := UpgradeAllocs(s.logger, tx); err != nil { + return err + } + } + if upgrade13 { + if err := UpgradeDynamicPluginRegistry(s.logger, tx); err != nil { + return err + } } // Add standard metadata @@ -773,6 +781,7 @@ func (s *BoltStateDB) Upgrade() error { if err != nil { return err } + return bkt.Put(metaUpgradedKey, time.Now().Format(time.RFC3339)) }); err != nil { return err diff --git a/client/state/testdata/state-1.2.6.db.gz b/client/state/testdata/state-1.2.6.db.gz new file mode 100644 index 000000000..745fe5aaf --- /dev/null +++ b/client/state/testdata/state-1.2.6.db.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:bcd7207dd782194e81e1844555d6108bafd8a1f9151d47d5991a64c5f018a851 +size 608 diff --git a/client/state/upgrade.go b/client/state/upgrade.go index 5942105d5..63ccc88b3 100644 --- a/client/state/upgrade.go +++ b/client/state/upgrade.go @@ -2,21 +2,24 @@ package state import ( "bytes" + "container/list" "fmt" "os" "github.com/boltdb/bolt" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/helper/boltdd" "github.com/hashicorp/nomad/nomad/structs" ) // NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is // already up to date. -func NeedsUpgrade(bdb *bolt.DB) (bool, error) { - needsUpgrade := true - err := bdb.View(func(tx *bolt.Tx) error { +func NeedsUpgrade(bdb *bolt.DB) (upgradeTo09, upgradeTo13 bool, err error) { + upgradeTo09 = true + upgradeTo13 = true + err = bdb.View(func(tx *bolt.Tx) error { b := tx.Bucket(metaBucketName) if b == nil { // No meta bucket; upgrade @@ -29,18 +32,23 @@ func NeedsUpgrade(bdb *bolt.DB) (bool, error) { return nil } - if !bytes.Equal(v, metaVersion) { - // Version exists but does not match. Abort. - return fmt.Errorf("incompatible state version. expected %q but found %q", - metaVersion, v) + if bytes.Equal(v, []byte{'2'}) { + upgradeTo09 = false + return nil + } + if bytes.Equal(v, metaVersion) { + upgradeTo09 = false + upgradeTo13 = false + return nil } - // Version matches! Assume migrated! - needsUpgrade = false - return nil + // Version exists but does not match. Abort. + return fmt.Errorf("incompatible state version. expected %q but found %q", + metaVersion, v) + }) - return needsUpgrade, err + return } // addMeta adds version metadata to BoltDB to mark it as upgraded and @@ -51,7 +59,6 @@ func addMeta(tx *bolt.Tx) error { if err != nil { return err } - return bkt.Put(metaVersionKey, metaVersion) } @@ -312,3 +319,32 @@ func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) erro return nil } + +func UpgradeDynamicPluginRegistry(logger hclog.Logger, tx *boltdd.Tx) error { + + dynamicBkt := tx.Bucket(dynamicPluginBucketName) + if dynamicBkt == nil { + return nil // no previous plugins upgrade + } + + oldState := &RegistryState12{} + if err := dynamicBkt.Get(registryStateKey, oldState); err != nil { + if !boltdd.IsErrNotFound(err) { + return fmt.Errorf("failed to read dynamic plugin registry state: %v", err) + } + } + + newState := &dynamicplugins.RegistryState{ + Plugins: make(map[string]map[string]*list.List), + } + + for ptype, plugins := range oldState.Plugins { + newState.Plugins[ptype] = make(map[string]*list.List) + for pname, pluginInfo := range plugins { + newState.Plugins[ptype][pname] = list.New() + entry := list.Element{Value: pluginInfo} + newState.Plugins[ptype][pname].PushFront(entry) + } + } + return dynamicBkt.Put(registryStateKey, newState) +} diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go index f81e371bf..540b3f33b 100644 --- a/client/state/upgrade_int_test.go +++ b/client/state/upgrade_int_test.go @@ -1,6 +1,7 @@ package state_test import ( + "bytes" "compress/gzip" "fmt" "io" @@ -10,6 +11,7 @@ import ( "strings" "testing" + "github.com/boltdb/bolt" "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/allocwatcher" clientconfig "github.com/hashicorp/nomad/client/config" @@ -32,40 +34,49 @@ import ( func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) { t.Parallel() - files, err := filepath.Glob("testdata/*.db*") - require.NoError(t, err) + dbFromTestFile := func(t *testing.T, dir, fn string) *BoltStateDB { - for _, fn := range files { + var src io.ReadCloser + src, err := os.Open(fn) + require.NoError(t, err) + defer src.Close() + + // testdata may be gzip'd; decode on copy + if strings.HasSuffix(fn, ".gz") { + src, err = gzip.NewReader(src) + require.NoError(t, err) + } + + dst, err := os.Create(filepath.Join(dir, "state.db")) + require.NoError(t, err) + + // Copy test files before testing them for safety + _, err = io.Copy(dst, src) + require.NoError(t, err) + + require.NoError(t, src.Close()) + + dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir) + require.NoError(t, err) + + db := dbI.(*BoltStateDB) + return db + } + + pre09files := []string{ + "testdata/state-0.7.1.db.gz", + "testdata/state-0.8.6-empty.db.gz", + "testdata/state-0.8.6-no-deploy.db.gz"} + + for _, fn := range pre09files { t.Run(fn, func(t *testing.T) { + dir, err := ioutil.TempDir("", "nomadtest") require.NoError(t, err) defer os.RemoveAll(dir) - var src io.ReadCloser - src, err = os.Open(fn) - require.NoError(t, err) - defer src.Close() - - // testdata may be gzip'd; decode on copy - if strings.HasSuffix(fn, ".gz") { - src, err = gzip.NewReader(src) - require.NoError(t, err) - } - - dst, err := os.Create(filepath.Join(dir, "state.db")) - require.NoError(t, err) - - // Copy test files before testing them for safety - _, err = io.Copy(dst, src) - require.NoError(t, err) - - require.NoError(t, src.Close()) - - dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir) - require.NoError(t, err) - defer dbI.Close() - - db := dbI.(*BoltStateDB) + db := dbFromTestFile(t, dir, fn) + defer db.Close() // Simply opening old files should *not* alter them require.NoError(t, db.DB().View(func(tx *boltdd.Tx) error { @@ -76,16 +87,18 @@ func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) { return nil })) - needsUpgrade, err := NeedsUpgrade(db.DB().BoltDB()) + to09, to12, err := NeedsUpgrade(db.DB().BoltDB()) require.NoError(t, err) - require.True(t, needsUpgrade) + require.True(t, to09) + require.True(t, to12) - // Attept the upgrade + // Attempt the upgrade require.NoError(t, db.Upgrade()) - needsUpgrade, err = NeedsUpgrade(db.DB().BoltDB()) + to09, to12, err = NeedsUpgrade(db.DB().BoltDB()) require.NoError(t, err) - require.False(t, needsUpgrade) + require.False(t, to09) + require.False(t, to12) // Ensure Allocations can be restored and // NewAR/AR.Restore do not error. @@ -109,9 +122,59 @@ func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) { } require.NoError(t, db.PutDevicePluginState(ps)) + registry, err := db.GetDynamicPluginRegistryState() + require.Nil(t, registry) + + require.NoError(t, err) require.NoError(t, db.Close()) }) } + + t.Run("testdata/state-1.2.6.db.gz", func(t *testing.T) { + fn := "testdata/state-1.2.6.db.gz" + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + defer os.RemoveAll(dir) + + db := dbFromTestFile(t, dir, fn) + defer db.Close() + + // Simply opening old files should *not* alter them + db.DB().BoltDB().View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("meta")) + if b == nil { + return fmt.Errorf("meta bucket should exist") + } + v := b.Get([]byte("version")) + if len(v) == 0 { + return fmt.Errorf("meta version is missing") + } + if !bytes.Equal(v, []byte{'2'}) { + return fmt.Errorf("meta version should not have changed") + } + return nil + }) + + to09, to12, err := NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.False(t, to09) + require.True(t, to12) + + // Attempt the upgrade + require.NoError(t, db.Upgrade()) + + to09, to12, err = NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.False(t, to09) + require.False(t, to12) + + registry, err := db.GetDynamicPluginRegistryState() + require.NoError(t, err) + require.NotNil(t, registry) + require.Len(t, registry.Plugins["csi-node"], 2) + + require.NoError(t, db.Close()) + }) } // checkUpgradedAlloc creates and restores an AllocRunner from an upgraded diff --git a/client/state/upgrade_test.go b/client/state/upgrade_test.go index 571c12a55..1bf36273f 100644 --- a/client/state/upgrade_test.go +++ b/client/state/upgrade_test.go @@ -38,9 +38,10 @@ func TestUpgrade_NeedsUpgrade_New(t *testing.T) { db, cleanup := setupBoltStateDB(t) defer cleanup() - up, err := NeedsUpgrade(db.DB().BoltDB()) + to09, to12, err := NeedsUpgrade(db.DB().BoltDB()) require.NoError(t, err) - require.False(t, up) + require.False(t, to09) + require.False(t, to12) } // TestUpgrade_NeedsUpgrade_Old asserts state dbs with just the alloctions @@ -58,16 +59,18 @@ func TestUpgrade_NeedsUpgrade_Old(t *testing.T) { return err })) - up, err := NeedsUpgrade(db) + to09, to12, err := NeedsUpgrade(db) require.NoError(t, err) - require.True(t, up) + require.True(t, to09) + require.True(t, to12) // Adding meta should mark it as upgraded require.NoError(t, db.Update(addMeta)) - up, err = NeedsUpgrade(db) + to09, to12, err = NeedsUpgrade(db) require.NoError(t, err) - require.False(t, up) + require.False(t, to09) + require.False(t, to12) } // TestUpgrade_NeedsUpgrade_Error asserts that an error is returned from @@ -79,7 +82,7 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) { cases := [][]byte{ {'"', '2', '"'}, // wrong type {'1'}, // wrong version (never existed) - {'3'}, // wrong version (future) + {'4'}, // wrong version (future) } for _, tc := range cases { @@ -95,7 +98,7 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) { return bkt.Put(metaVersionKey, tc) })) - _, err := NeedsUpgrade(db) + _, _, err := NeedsUpgrade(db) require.Error(t, err) }) } diff --git a/website/content/docs/upgrade/upgrade-specific.mdx b/website/content/docs/upgrade/upgrade-specific.mdx index a29c28697..16473d896 100644 --- a/website/content/docs/upgrade/upgrade-specific.mdx +++ b/website/content/docs/upgrade/upgrade-specific.mdx @@ -25,6 +25,32 @@ In Nomad 1.3.0, the default raft protocol version has been updated to server will automatically upgrade that server's raft protocol. See the [Upgrading to Raft Protocol 3] guide. +#### Client State Store + +The client state store will be automatically migrated to a new schema +version when upgrading a client. + +Downgrading to a previous version of the client after upgrading it to +Nomad 1.3 is not supported. To downgrade safely, users should drain +all tasks from the Nomad client and erase its data directory. + +#### CSI Plugins + +The client filesystem layout for CSI plugins has been updated to +correctly handle the lifecycle of multiple allocations serving the +same plugin. Running plugin tasks will not be updated after upgrading +the client, but it is recommended to redeploy CSI plugin jobs after +upgrading the cluster. + +The directory for plugin control sockets will be mounted from a new +per-allocation directory in the client data dir. This will still be +bind-mounted to `csi_plugin.mount_config` as in versions of Nomad +prior to 1.3.0. + +The volume staging directory for new CSI plugin tasks will now be +mounted to the task's `NOMAD_TASK_DIR` instead of the +`csi_plugin.mount_config`. + ## Nomad 1.2.6, 1.1.12, and 1.0.18 #### ACL requirement for the job parse endpoint