540 lines
14 KiB
Go
540 lines
14 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
// dynamicplugins is a package that manages dynamic plugins in Nomad.
|
|
// It exposes a registry that allows for plugins to be registered/deregistered
|
|
// and also allows subscribers to receive real time updates of these events.
|
|
package dynamicplugins
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/helper"
|
|
)
|
|
|
|
const (
|
|
PluginTypeCSIController = "csi-controller"
|
|
PluginTypeCSINode = "csi-node"
|
|
)
|
|
|
|
// Registry is an interface that allows for the dynamic registration of plugins
|
|
// that are running as Nomad Tasks.
|
|
type Registry interface {
|
|
RegisterPlugin(info *PluginInfo) error
|
|
DeregisterPlugin(ptype, name, allocID string) error
|
|
|
|
WaitForPlugin(ctx context.Context, ptype, pname string) (*PluginInfo, error)
|
|
ListPlugins(ptype string) []*PluginInfo
|
|
DispensePlugin(ptype, name string) (interface{}, error)
|
|
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)
|
|
|
|
PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent
|
|
|
|
Shutdown()
|
|
|
|
StubDispenserForType(ptype string, dispenser PluginDispenser)
|
|
}
|
|
|
|
// RegistryState is what we persist in the client state
|
|
// store. It contains a map of plugin types to maps of plugin name ->
|
|
// list of *PluginInfo, sorted by recency of registration
|
|
type RegistryState struct {
|
|
Plugins map[string]map[string]*list.List
|
|
}
|
|
|
|
type PluginDispenser func(info *PluginInfo) (interface{}, error)
|
|
|
|
// NewRegistry takes a map of `plugintype` to PluginDispenser functions
|
|
// that should be used to vend clients for plugins to be used.
|
|
func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry {
|
|
|
|
registry := &dynamicRegistry{
|
|
plugins: make(map[string]map[string]*list.List),
|
|
broadcasters: make(map[string]*pluginEventBroadcaster),
|
|
dispensers: dispensers,
|
|
state: state,
|
|
}
|
|
|
|
// populate the state and initial broadcasters if we have an
|
|
// existing state DB to restore
|
|
if state != nil {
|
|
storedState, err := state.GetDynamicPluginRegistryState()
|
|
if err == nil && storedState != nil {
|
|
registry.plugins = storedState.Plugins
|
|
for ptype := range registry.plugins {
|
|
registry.broadcasterForPluginType(ptype)
|
|
}
|
|
}
|
|
}
|
|
|
|
return registry
|
|
}
|
|
|
|
// StateStorage is used to persist the dynamic plugin registry's state
|
|
// across agent restarts.
|
|
type StateStorage interface {
|
|
// GetDynamicPluginRegistryState is used to restore the registry state
|
|
GetDynamicPluginRegistryState() (*RegistryState, error)
|
|
|
|
// PutDynamicPluginRegistryState is used to store the registry state
|
|
PutDynamicPluginRegistryState(state *RegistryState) error
|
|
}
|
|
|
|
// PluginInfo is the metadata that is stored by the registry for a given plugin.
|
|
type PluginInfo struct {
|
|
Name string
|
|
Type string
|
|
Version string
|
|
|
|
// ConnectionInfo should only be used externally during `RegisterPlugin` and
|
|
// may not be exposed in the future.
|
|
ConnectionInfo *PluginConnectionInfo
|
|
|
|
// AllocID tracks the allocation running the plugin
|
|
AllocID string
|
|
|
|
// Options is used for plugin registrations to pass further metadata along to
|
|
// other subsystems
|
|
Options map[string]string
|
|
}
|
|
|
|
// PluginConnectionInfo is the data required to connect to the plugin.
|
|
// note: We currently only support Unix Domain Sockets, but this may be expanded
|
|
//
|
|
// to support other connection modes in the future.
|
|
type PluginConnectionInfo struct {
|
|
// SocketPath is the path to the plugins api socket.
|
|
SocketPath string
|
|
}
|
|
|
|
// EventType is the enum of events that will be emitted by a Registry's
|
|
// PluginsUpdatedCh.
|
|
type EventType string
|
|
|
|
const (
|
|
// EventTypeRegistered is emitted by the Registry when a new plugin has been
|
|
// registered.
|
|
EventTypeRegistered EventType = "registered"
|
|
// EventTypeDeregistered is emitted by the Registry when a plugin has been
|
|
// removed.
|
|
EventTypeDeregistered EventType = "deregistered"
|
|
)
|
|
|
|
// PluginUpdateEvent is a struct that is sent over a PluginsUpdatedCh when
|
|
// plugins are added or removed from the registry.
|
|
type PluginUpdateEvent struct {
|
|
EventType EventType
|
|
Info *PluginInfo
|
|
}
|
|
|
|
type dynamicRegistry struct {
|
|
plugins map[string]map[string]*list.List
|
|
pluginsLock sync.RWMutex
|
|
|
|
broadcasters map[string]*pluginEventBroadcaster
|
|
broadcastersLock sync.Mutex
|
|
|
|
dispensers map[string]PluginDispenser
|
|
stubDispensers map[string]PluginDispenser
|
|
|
|
state StateStorage
|
|
}
|
|
|
|
// StubDispenserForType allows test functions to provide alternative plugin
|
|
// dispensers to simplify writing tests for higher level Nomad features.
|
|
// This function should not be called from production code.
|
|
func (d *dynamicRegistry) StubDispenserForType(ptype string, dispenser PluginDispenser) {
|
|
// delete from stubs
|
|
if dispenser == nil && d.stubDispensers != nil {
|
|
delete(d.stubDispensers, ptype)
|
|
if len(d.stubDispensers) == 0 {
|
|
d.stubDispensers = nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// setup stubs
|
|
if d.stubDispensers == nil {
|
|
d.stubDispensers = make(map[string]PluginDispenser, 1)
|
|
}
|
|
|
|
d.stubDispensers[ptype] = dispenser
|
|
}
|
|
|
|
func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error {
|
|
if info.Type == "" {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return errors.New("Plugin.Type must not be empty")
|
|
}
|
|
|
|
if info.ConnectionInfo == nil {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return errors.New("Plugin.ConnectionInfo must not be nil")
|
|
}
|
|
|
|
if info.Name == "" {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return errors.New("Plugin.Name must not be empty")
|
|
}
|
|
|
|
d.pluginsLock.Lock()
|
|
defer d.pluginsLock.Unlock()
|
|
|
|
pmap, ok := d.plugins[info.Type]
|
|
if !ok {
|
|
pmap = make(map[string]*list.List)
|
|
d.plugins[info.Type] = pmap
|
|
}
|
|
infos, ok := pmap[info.Name]
|
|
if !ok {
|
|
infos = list.New()
|
|
pmap[info.Name] = infos
|
|
}
|
|
|
|
// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
|
|
// If we're already registered, we should update the definition
|
|
// and send a broadcast of any update so the instanceManager can
|
|
// be restarted if there's been a change
|
|
var alreadyRegistered bool
|
|
for e := infos.Front(); e != nil; e = e.Next() {
|
|
if e.Value.(*PluginInfo).AllocID == info.AllocID {
|
|
alreadyRegistered = true
|
|
break
|
|
}
|
|
}
|
|
if !alreadyRegistered {
|
|
infos.PushFront(info)
|
|
broadcaster := d.broadcasterForPluginType(info.Type)
|
|
event := &PluginUpdateEvent{
|
|
EventType: EventTypeRegistered,
|
|
Info: info,
|
|
}
|
|
broadcaster.broadcast(event)
|
|
}
|
|
|
|
return d.sync()
|
|
}
|
|
|
|
func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBroadcaster {
|
|
d.broadcastersLock.Lock()
|
|
defer d.broadcastersLock.Unlock()
|
|
|
|
broadcaster, ok := d.broadcasters[ptype]
|
|
if !ok {
|
|
broadcaster = newPluginEventBroadcaster()
|
|
d.broadcasters[ptype] = broadcaster
|
|
}
|
|
|
|
return broadcaster
|
|
}
|
|
|
|
func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error {
|
|
d.pluginsLock.Lock()
|
|
defer d.pluginsLock.Unlock()
|
|
|
|
if ptype == "" {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return errors.New("must specify plugin type to deregister")
|
|
}
|
|
if name == "" {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return errors.New("must specify plugin name to deregister")
|
|
}
|
|
if allocID == "" {
|
|
return errors.New("must specify plugin allocation ID to deregister")
|
|
}
|
|
|
|
pmap, ok := d.plugins[ptype]
|
|
if !ok {
|
|
// If this occurs there's a bug in the registration handler.
|
|
return fmt.Errorf("no plugins registered for type: %s", ptype)
|
|
}
|
|
|
|
infos, ok := pmap[name]
|
|
if !ok {
|
|
// plugin already deregistered, don't send events or try re-deleting.
|
|
return nil
|
|
}
|
|
|
|
var info *PluginInfo
|
|
for e := infos.Front(); e != nil; e = e.Next() {
|
|
info = e.Value.(*PluginInfo)
|
|
if info.AllocID == allocID {
|
|
infos.Remove(e)
|
|
break
|
|
}
|
|
}
|
|
|
|
if info != nil {
|
|
broadcaster := d.broadcasterForPluginType(ptype)
|
|
event := &PluginUpdateEvent{
|
|
EventType: EventTypeDeregistered,
|
|
Info: info,
|
|
}
|
|
broadcaster.broadcast(event)
|
|
}
|
|
|
|
return d.sync()
|
|
}
|
|
|
|
func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
|
|
d.pluginsLock.RLock()
|
|
defer d.pluginsLock.RUnlock()
|
|
|
|
pmap, ok := d.plugins[ptype]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
plugins := make([]*PluginInfo, 0, len(pmap))
|
|
|
|
for _, info := range pmap {
|
|
if info.Front() != nil {
|
|
plugins = append(plugins, info.Front().Value.(*PluginInfo))
|
|
}
|
|
}
|
|
|
|
return plugins
|
|
}
|
|
|
|
// WaitForPlugin repeatedly checks until a plugin with a given type and name
|
|
// becomes available or its context is canceled or times out.
|
|
// Callers should pass in a context with a sensible timeout
|
|
// for the plugin they're expecting to find.
|
|
func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) (*PluginInfo, error) {
|
|
// this is our actual goal, which may be run repeatedly
|
|
findPlugin := func() *PluginInfo {
|
|
for _, p := range d.ListPlugins(ptype) {
|
|
if p.Name == name {
|
|
return p
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// try immediately first, before any timers get involved
|
|
if p := findPlugin(); p != nil {
|
|
return p, nil
|
|
}
|
|
|
|
// next, loop until found or context is done
|
|
|
|
// these numbers are almost arbitrary...
|
|
delay := 200 // milliseconds between checks, will backoff
|
|
maxDelay := 5000 // up to 5 seconds between each check
|
|
|
|
// put a long upper bound on total time,
|
|
// just in case callers don't follow directions.
|
|
ctx, cancel := context.WithTimeout(ctx, 24*time.Hour)
|
|
defer cancel()
|
|
|
|
timer, stop := helper.NewSafeTimer(time.Duration(delay) * time.Millisecond)
|
|
defer stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// an externally-defined timeout wins the day
|
|
return nil, ctx.Err()
|
|
case <-timer.C:
|
|
// continue after our internal delay
|
|
}
|
|
|
|
if p := findPlugin(); p != nil {
|
|
return p, nil
|
|
}
|
|
|
|
if delay < maxDelay {
|
|
delay += delay
|
|
}
|
|
if delay > maxDelay {
|
|
delay = maxDelay
|
|
}
|
|
timer.Reset(time.Duration(delay) * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}, error) {
|
|
d.pluginsLock.Lock()
|
|
defer d.pluginsLock.Unlock()
|
|
|
|
if ptype == "" {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return nil, errors.New("must specify plugin type to dispense")
|
|
}
|
|
if name == "" {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return nil, errors.New("must specify plugin name to dispense")
|
|
}
|
|
|
|
dispenseFunc, ok := d.dispensers[ptype]
|
|
if !ok {
|
|
// This error shouldn't make it to a production cluster and is to aid
|
|
// developers during the development of new plugin types.
|
|
return nil, fmt.Errorf("no plugin dispenser found for type: %s", ptype)
|
|
}
|
|
|
|
// After initially loading the dispenser (to avoid masking missing setup in
|
|
// client/client.go), we then check to see if we have any stub dispensers for
|
|
// this plugin type. If we do, then replace the dispenser fn with the stub.
|
|
if d.stubDispensers != nil {
|
|
if stub, ok := d.stubDispensers[ptype]; ok {
|
|
dispenseFunc = stub
|
|
}
|
|
}
|
|
|
|
pmap, ok := d.plugins[ptype]
|
|
if !ok {
|
|
return nil, fmt.Errorf("no plugins registered for type: %s", ptype)
|
|
}
|
|
|
|
info, ok := pmap[name]
|
|
if !ok || info.Front() == nil {
|
|
return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype)
|
|
}
|
|
|
|
return dispenseFunc(info.Front().Value.(*PluginInfo))
|
|
}
|
|
|
|
func (d *dynamicRegistry) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) {
|
|
d.pluginsLock.Lock()
|
|
defer d.pluginsLock.Unlock()
|
|
|
|
pmap, ok := d.plugins[ptype]
|
|
if !ok {
|
|
return nil, fmt.Errorf("no plugins registered for type: %s", ptype)
|
|
}
|
|
|
|
infos, ok := pmap[name]
|
|
if ok {
|
|
for e := infos.Front(); e != nil; e = e.Next() {
|
|
plugin := e.Value.(*PluginInfo)
|
|
if plugin.AllocID == allocID {
|
|
return plugin, nil
|
|
}
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("no plugin for that allocation")
|
|
}
|
|
|
|
// PluginsUpdatedCh returns a channel over which plugin events for the requested
|
|
// plugin type will be emitted. These events are strongly ordered and will never
|
|
// be dropped.
|
|
//
|
|
// The receiving channel _must not_ be closed before the provided context is
|
|
// cancelled.
|
|
func (d *dynamicRegistry) PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent {
|
|
b := d.broadcasterForPluginType(ptype)
|
|
ch := b.subscribe()
|
|
go func() {
|
|
select {
|
|
case <-b.shutdownCh:
|
|
return
|
|
case <-ctx.Done():
|
|
b.unsubscribe(ch)
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
func (d *dynamicRegistry) sync() error {
|
|
if d.state != nil {
|
|
storedState := &RegistryState{Plugins: d.plugins}
|
|
return d.state.PutDynamicPluginRegistryState(storedState)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *dynamicRegistry) Shutdown() {
|
|
for _, b := range d.broadcasters {
|
|
b.shutdown()
|
|
}
|
|
}
|
|
|
|
type pluginEventBroadcaster struct {
|
|
stopCh chan struct{}
|
|
shutdownCh chan struct{}
|
|
publishCh chan *PluginUpdateEvent
|
|
|
|
subscriptions map[chan *PluginUpdateEvent]struct{}
|
|
subscriptionsLock sync.RWMutex
|
|
}
|
|
|
|
func newPluginEventBroadcaster() *pluginEventBroadcaster {
|
|
b := &pluginEventBroadcaster{
|
|
stopCh: make(chan struct{}),
|
|
shutdownCh: make(chan struct{}),
|
|
publishCh: make(chan *PluginUpdateEvent, 1),
|
|
subscriptions: make(map[chan *PluginUpdateEvent]struct{}),
|
|
}
|
|
go b.run()
|
|
return b
|
|
}
|
|
|
|
func (p *pluginEventBroadcaster) run() {
|
|
for {
|
|
select {
|
|
case <-p.stopCh:
|
|
close(p.shutdownCh)
|
|
return
|
|
case msg := <-p.publishCh:
|
|
p.subscriptionsLock.RLock()
|
|
for msgCh := range p.subscriptions {
|
|
msgCh <- msg
|
|
}
|
|
p.subscriptionsLock.RUnlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *pluginEventBroadcaster) shutdown() {
|
|
close(p.stopCh)
|
|
|
|
// Wait for loop to exit before closing subscriptions
|
|
<-p.shutdownCh
|
|
|
|
p.subscriptionsLock.Lock()
|
|
for sub := range p.subscriptions {
|
|
delete(p.subscriptions, sub)
|
|
close(sub)
|
|
}
|
|
p.subscriptionsLock.Unlock()
|
|
}
|
|
|
|
func (p *pluginEventBroadcaster) broadcast(e *PluginUpdateEvent) {
|
|
p.publishCh <- e
|
|
}
|
|
|
|
func (p *pluginEventBroadcaster) subscribe() chan *PluginUpdateEvent {
|
|
p.subscriptionsLock.Lock()
|
|
defer p.subscriptionsLock.Unlock()
|
|
|
|
ch := make(chan *PluginUpdateEvent, 1)
|
|
p.subscriptions[ch] = struct{}{}
|
|
return ch
|
|
}
|
|
|
|
func (p *pluginEventBroadcaster) unsubscribe(ch chan *PluginUpdateEvent) {
|
|
p.subscriptionsLock.Lock()
|
|
defer p.subscriptionsLock.Unlock()
|
|
|
|
_, ok := p.subscriptions[ch]
|
|
if ok {
|
|
delete(p.subscriptions, ch)
|
|
close(ch)
|
|
}
|
|
}
|