5a0bcd39d1
Allow for faster updates to plugin status when allocations become terminal by listening for register/deregister events from the dynamic plugin registry (which in turn are triggered by the plugin supervisor hook). The deregistration function closures that we pass up to the CSI plugin manager don't properly close over the name and type of the registration, causing monolith-type plugins to deregister only one of their two plugins on alloc shutdown. Rebind plugin supervisor deregistration targets to fix that. Includes log message and comment improvements
245 lines
5.7 KiB
Go
245 lines
5.7 KiB
Go
package dynamicplugins
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestPluginEventBroadcaster_SendsMessagesToAllClients(t *testing.T) {
|
|
t.Parallel()
|
|
b := newPluginEventBroadcaster()
|
|
defer close(b.stopCh)
|
|
var rcv1, rcv2 bool
|
|
|
|
ch1 := b.subscribe()
|
|
ch2 := b.subscribe()
|
|
|
|
listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
|
|
select {
|
|
case <-ch:
|
|
*updateBool = true
|
|
}
|
|
}
|
|
|
|
go listenFunc(ch1, &rcv1)
|
|
go listenFunc(ch2, &rcv2)
|
|
|
|
b.broadcast(&PluginUpdateEvent{})
|
|
|
|
require.Eventually(t, func() bool {
|
|
return rcv1 == true && rcv2 == true
|
|
}, 1*time.Second, 200*time.Millisecond)
|
|
}
|
|
|
|
func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
b := newPluginEventBroadcaster()
|
|
defer close(b.stopCh)
|
|
var rcv1 bool
|
|
|
|
ch1 := b.subscribe()
|
|
|
|
listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
|
|
select {
|
|
case e := <-ch:
|
|
if e == nil {
|
|
*updateBool = true
|
|
}
|
|
}
|
|
}
|
|
|
|
go listenFunc(ch1, &rcv1)
|
|
|
|
b.unsubscribe(ch1)
|
|
|
|
b.broadcast(&PluginUpdateEvent{})
|
|
|
|
require.Eventually(t, func() bool {
|
|
return rcv1 == true
|
|
}, 1*time.Second, 200*time.Millisecond)
|
|
}
|
|
|
|
func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
|
|
t.Parallel()
|
|
r := NewRegistry(nil, nil)
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
ch := r.PluginsUpdatedCh(ctx, "csi")
|
|
receivedRegistrationEvent := false
|
|
|
|
listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
|
|
select {
|
|
case e := <-ch:
|
|
if e == nil {
|
|
return
|
|
}
|
|
|
|
if e.EventType == EventTypeRegistered {
|
|
*updateBool = true
|
|
}
|
|
}
|
|
}
|
|
|
|
go listenFunc(ch, &receivedRegistrationEvent)
|
|
|
|
err := r.RegisterPlugin(&PluginInfo{
|
|
Type: "csi",
|
|
Name: "my-plugin",
|
|
ConnectionInfo: &PluginConnectionInfo{},
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Eventually(t, func() bool {
|
|
return receivedRegistrationEvent == true
|
|
}, 1*time.Second, 200*time.Millisecond)
|
|
}
|
|
|
|
func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
|
|
t.Parallel()
|
|
r := NewRegistry(nil, nil)
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
ch := r.PluginsUpdatedCh(ctx, "csi")
|
|
receivedDeregistrationEvent := false
|
|
|
|
listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
|
|
for {
|
|
select {
|
|
case e := <-ch:
|
|
if e == nil {
|
|
return
|
|
}
|
|
|
|
if e.EventType == EventTypeDeregistered {
|
|
*updateBool = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
go listenFunc(ch, &receivedDeregistrationEvent)
|
|
|
|
err := r.RegisterPlugin(&PluginInfo{
|
|
Type: "csi",
|
|
Name: "my-plugin",
|
|
ConnectionInfo: &PluginConnectionInfo{},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
err = r.DeregisterPlugin("csi", "my-plugin")
|
|
require.NoError(t, err)
|
|
|
|
require.Eventually(t, func() bool {
|
|
return receivedDeregistrationEvent == true
|
|
}, 1*time.Second, 200*time.Millisecond)
|
|
}
|
|
|
|
func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) {
|
|
dispenseFn := func(i *PluginInfo) (interface{}, error) {
|
|
return struct{}{}, nil
|
|
}
|
|
|
|
registry := NewRegistry(nil, map[string]PluginDispenser{"csi": dispenseFn})
|
|
|
|
err := registry.RegisterPlugin(&PluginInfo{
|
|
Type: "csi",
|
|
Name: "my-plugin",
|
|
ConnectionInfo: &PluginConnectionInfo{},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
result, err := registry.DispensePlugin("unknown-type", "unknown-name")
|
|
require.Nil(t, result)
|
|
require.EqualError(t, err, "no plugin dispenser found for type: unknown-type")
|
|
|
|
result, err = registry.DispensePlugin("csi", "unknown-name")
|
|
require.Nil(t, result)
|
|
require.EqualError(t, err, "plugin unknown-name for type csi not found")
|
|
|
|
result, err = registry.DispensePlugin("csi", "my-plugin")
|
|
require.NotNil(t, result)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) {
|
|
t.Parallel()
|
|
r := NewRegistry(nil, nil)
|
|
|
|
err := r.RegisterPlugin(&PluginInfo{
|
|
Type: PluginTypeCSIController,
|
|
Name: "my-plugin",
|
|
ConnectionInfo: &PluginConnectionInfo{},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
err = r.RegisterPlugin(&PluginInfo{
|
|
Type: PluginTypeCSINode,
|
|
Name: "my-plugin",
|
|
ConnectionInfo: &PluginConnectionInfo{},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin")
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(r.ListPlugins(PluginTypeCSINode)), 1)
|
|
require.Equal(t, len(r.ListPlugins(PluginTypeCSIController)), 0)
|
|
}
|
|
|
|
func TestDynamicRegistry_StateStore(t *testing.T) {
|
|
t.Parallel()
|
|
dispenseFn := func(i *PluginInfo) (interface{}, error) {
|
|
return i, nil
|
|
}
|
|
|
|
memdb := &MemDB{}
|
|
oldR := NewRegistry(memdb, map[string]PluginDispenser{"csi": dispenseFn})
|
|
|
|
err := oldR.RegisterPlugin(&PluginInfo{
|
|
Type: "csi",
|
|
Name: "my-plugin",
|
|
ConnectionInfo: &PluginConnectionInfo{},
|
|
})
|
|
require.NoError(t, err)
|
|
result, err := oldR.DispensePlugin("csi", "my-plugin")
|
|
require.NotNil(t, result)
|
|
require.NoError(t, err)
|
|
|
|
// recreate the registry from the state store and query again
|
|
newR := NewRegistry(memdb, map[string]PluginDispenser{"csi": dispenseFn})
|
|
result, err = newR.DispensePlugin("csi", "my-plugin")
|
|
require.NotNil(t, result)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// MemDB implements a StateDB that stores data in memory and should only be
|
|
// used for testing. All methods are safe for concurrent use. This is a
|
|
// partial implementation of the MemDB in the client/state package, copied
|
|
// here to avoid circular dependencies.
|
|
type MemDB struct {
|
|
dynamicManagerPs *RegistryState
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (m *MemDB) GetDynamicPluginRegistryState() (*RegistryState, error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.dynamicManagerPs, nil
|
|
}
|
|
|
|
func (m *MemDB) PutDynamicPluginRegistryState(ps *RegistryState) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.dynamicManagerPs = ps
|
|
return nil
|
|
}
|