CSI: fix data race in plugin manager (#12553)

The plugin manager for CSI hands out instances of a plugin for callers
that need to mount a volume. The `MounterForPlugin` method accesses
the internal instances map without a lock, and can be called
concurrently from outside the plugin manager's main run-loop.

The original commit for the instances map included a warning that it
needed to be accessed only from the main loop but that comment was
unfortunately ignored shortly thereafter, so this bug has existed in
the code for a couple years without being detected until we ran tests
with `-race` in #12098. Lesson learned here: comments make for lousy
enforcement of invariants!
This commit is contained in:
Tim Gross 2022-04-12 12:18:04 -04:00 committed by GitHub
parent 82027edb2f
commit a135d9b260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 29 deletions

3
.changelog/12553.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed bug where accessing plugins was subject to a data race
```

View File

@ -54,9 +54,10 @@ func New(config *Config) Manager {
}
type csiManager struct {
// instances should only be accessed from the run() goroutine and the shutdown
// fn. It is a map of PluginType : [PluginName : *instanceManager]
instances map[string]map[string]*instanceManager
// instances should only be accessed after locking with instancesLock.
// It is a map of PluginType : [PluginName : *instanceManager]
instances map[string]map[string]*instanceManager
instancesLock sync.RWMutex
registry dynamicplugins.Registry
logger hclog.Logger
@ -75,6 +76,8 @@ func (c *csiManager) PluginManager() pluginmanager.PluginManager {
}
func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) {
c.instancesLock.RLock()
defer c.instancesLock.RUnlock()
nodePlugins, hasAnyNodePlugins := c.instances["csi-node"]
if !hasAnyNodePlugins {
return nil, fmt.Errorf("no storage node plugins found")
@ -118,6 +121,10 @@ func (c *csiManager) runLoop() {
// managers against those in the registry. we primarily will use update
// events from the registry.
func (c *csiManager) resyncPluginsFromRegistry(ptype string) {
c.instancesLock.Lock()
defer c.instancesLock.Unlock()
plugins := c.registry.ListPlugins(ptype)
seen := make(map[string]struct{}, len(plugins))
@ -150,6 +157,9 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent)
"plugin_id", event.Info.Name,
"plugin_alloc_id", event.Info.AllocID)
c.instancesLock.Lock()
defer c.instancesLock.Unlock()
switch event.EventType {
case dynamicplugins.EventTypeRegistered:
c.ensureInstance(event.Info)
@ -163,6 +173,7 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent)
// Ensure we have an instance manager for the plugin and add it to
// the CSI manager's tracking table for that plugin type.
// Assumes that c.instances has been locked.
func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) {
name := plugin.Name
ptype := plugin.Type
@ -185,6 +196,7 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) {
// Shut down the instance manager for a plugin and remove it from
// the CSI manager's tracking table for that plugin type.
// Assumes that c.instances has been locked.
func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) {
name := plugin.Name
ptype := plugin.Type
@ -200,6 +212,7 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) {
// Get the instance managers table for a specific plugin type,
// ensuring it's been initialized if it doesn't exist.
// Assumes that c.instances has been locked.
func (c *csiManager) instancesForType(ptype string) map[string]*instanceManager {
pluginMap, ok := c.instances[ptype]
if !ok {

View File

@ -62,13 +62,8 @@ func TestManager_RegisterPlugin(t *testing.T) {
pm.Run()
require.Eventually(t, func() bool {
pmap, ok := pm.instances[plugin.Type]
if !ok {
return false
}
_, ok = pmap[plugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)
}
@ -85,16 +80,16 @@ func TestManager_DeregisterPlugin(t *testing.T) {
pm.Run()
require.Eventually(t, func() bool {
_, ok := pm.instances[plugin.Type][plugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)
err = registry.DeregisterPlugin(plugin.Type, plugin.Name, "alloc-0")
require.NoError(t, err)
require.Eventually(t, func() bool {
_, ok := pm.instances[plugin.Type][plugin.Name]
return !ok
im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name)
return im == nil
}, 5*time.Second, 10*time.Millisecond)
}
@ -119,21 +114,21 @@ func TestManager_MultiplePlugins(t *testing.T) {
pm.Run()
require.Eventually(t, func() bool {
_, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, controllerPlugin.Type, controllerPlugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool {
_, ok := pm.instances[nodePlugin.Type][nodePlugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, nodePlugin.Type, nodePlugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)
err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name, "alloc-0")
require.NoError(t, err)
require.Eventually(t, func() bool {
_, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name]
return !ok
im := instanceManagerByTypeAndName(pm, controllerPlugin.Type, controllerPlugin.Name)
return im == nil
}, 5*time.Second, 10*time.Millisecond)
}
@ -154,8 +149,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) {
require.NoError(t, registry.RegisterPlugin(plugin0))
require.NoError(t, registry.RegisterPlugin(plugin1))
require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im.allocID == "alloc-1"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin")
@ -172,8 +168,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) {
pm.Run()
require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im.allocID == "alloc-1"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload")
@ -183,8 +180,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) {
require.NoError(t, registry.RegisterPlugin(plugin2))
require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" &&
im.allocID == "alloc-2"
}, 5*time.Second, 10*time.Millisecond, "alloc-2 plugin was not active after replacement")
@ -206,21 +204,32 @@ func TestManager_ConcurrentPlugins(t *testing.T) {
require.NoError(t, registry.RegisterPlugin(plugin1))
require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im.allocID == "alloc-1"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin")
registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin", "alloc-0")
require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin should still be active plugin")
})
}
// instanceManagerByTypeAndName is a test helper to get the instance
// manager for the plugin, protected by the lock that the csiManager
// will normally do internally
func instanceManagerByTypeAndName(mgr *csiManager, pluginType, pluginName string) *instanceManager {
mgr.instancesLock.RLock()
defer mgr.instancesLock.RUnlock()
im, _ := mgr.instances[pluginType][pluginName]
return im
}
// MemDB implements a StateDB that stores data in memory and should only be
// used for testing. All methods are safe for concurrent use. This is a
// partial implementation of the MemDB in the client/state package, copied