8df20f49f7
This allows us to correctly terminate internal state during runs of the nomad test suite, e.g closing eventer contexts correctly.
474 lines
13 KiB
Go
474 lines
13 KiB
Go
package drivermanager
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/base"
|
|
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/hashicorp/nomad/plugins/shared/loader"
|
|
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
|
)
|
|
|
|
const (
|
|
// driverFPBackoffBaseline is the baseline time for exponential backoff while
|
|
// fingerprinting a driver.
|
|
driverFPBackoffBaseline = 5 * time.Second
|
|
|
|
// driverFPBackoffLimit is the limit of the exponential backoff for fingerprinting
|
|
// a driver.
|
|
driverFPBackoffLimit = 2 * time.Minute
|
|
)
|
|
|
|
// instanceManagerConfig configures a driver instance manager
|
|
type instanceManagerConfig struct {
|
|
// Logger is the logger used by the driver instance manager
|
|
Logger log.Logger
|
|
|
|
// Ctx is used to shutdown the driver instance manager
|
|
Ctx context.Context
|
|
|
|
// Loader is the plugin loader
|
|
Loader loader.PluginCatalog
|
|
|
|
// StoreReattach is used to store a plugins reattach config
|
|
StoreReattach StorePluginReattachFn
|
|
|
|
// FetchReattach is used to retrieve a plugin's reattach config
|
|
FetchReattach FetchPluginReattachFn
|
|
|
|
// PluginConfig is the config passed to the launched plugins
|
|
PluginConfig *base.AgentConfig
|
|
|
|
// ID is the ID of the plugin being managed
|
|
ID *loader.PluginID
|
|
|
|
// updateNodeFromDriver is the callback used to update the node from fingerprinting
|
|
UpdateNodeFromDriver UpdateNodeDriverInfoFn
|
|
|
|
// EventHandlerFactory is used to fetch a task event handler
|
|
EventHandlerFactory TaskEventHandlerFactory
|
|
}
|
|
|
|
// instanceManager is used to manage a single driver plugin
|
|
type instanceManager struct {
|
|
// logger is the logger used by the driver instance manager
|
|
logger log.Logger
|
|
|
|
// ctx is used to shutdown the driver manager
|
|
ctx context.Context
|
|
|
|
// cancel is used to shutdown management of this driver plugin
|
|
cancel context.CancelFunc
|
|
|
|
// loader is the plugin loader
|
|
loader loader.PluginCatalog
|
|
|
|
// storeReattach is used to store a plugins reattach config
|
|
storeReattach StorePluginReattachFn
|
|
|
|
// fetchReattach is used to retrieve a plugin's reattach config
|
|
fetchReattach FetchPluginReattachFn
|
|
|
|
// pluginConfig is the config passed to the launched plugins
|
|
pluginConfig *base.AgentConfig
|
|
|
|
// id is the ID of the plugin being managed
|
|
id *loader.PluginID
|
|
|
|
// plugin is the plugin instance being managed
|
|
plugin loader.PluginInstance
|
|
|
|
// driver is the driver plugin being managed
|
|
driver drivers.DriverPlugin
|
|
|
|
// pluginLock locks access to the driver and plugin
|
|
pluginLock sync.Mutex
|
|
|
|
// shutdownLock is used to serialize attempts to shutdown
|
|
shutdownLock sync.Mutex
|
|
|
|
// updateNodeFromDriver is the callback used to update the node from fingerprinting
|
|
updateNodeFromDriver UpdateNodeDriverInfoFn
|
|
|
|
// eventHandlerFactory is used to fetch a handler for a task event
|
|
eventHandlerFactory TaskEventHandlerFactory
|
|
|
|
// firstFingerprintCh is used to trigger that we have successfully
|
|
// fingerprinted once. It is used to gate launching the stats collection.
|
|
firstFingerprintCh chan struct{}
|
|
hasFingerprinted bool
|
|
|
|
// lastHealthState is the last known health fingerprinted by the manager
|
|
lastHealthState drivers.HealthState
|
|
lastHealthStateMu sync.Mutex
|
|
}
|
|
|
|
// newInstanceManager returns a new driver instance manager. It is expected that
|
|
// the context passed in the configuration is cancelled in order to shutdown
|
|
// launched goroutines.
|
|
func newInstanceManager(c *instanceManagerConfig) *instanceManager {
|
|
|
|
ctx, cancel := context.WithCancel(c.Ctx)
|
|
i := &instanceManager{
|
|
logger: c.Logger.With("driver", c.ID.Name),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
loader: c.Loader,
|
|
storeReattach: c.StoreReattach,
|
|
fetchReattach: c.FetchReattach,
|
|
pluginConfig: c.PluginConfig,
|
|
id: c.ID,
|
|
updateNodeFromDriver: c.UpdateNodeFromDriver,
|
|
eventHandlerFactory: c.EventHandlerFactory,
|
|
firstFingerprintCh: make(chan struct{}),
|
|
}
|
|
|
|
go i.run()
|
|
return i
|
|
}
|
|
|
|
// WaitForFirstFingerprint waits until either the plugin fingerprints, the
|
|
// passed context is done, or the plugin instance manager is shutdown.
|
|
func (i *instanceManager) WaitForFirstFingerprint(ctx context.Context) {
|
|
select {
|
|
case <-i.ctx.Done():
|
|
case <-ctx.Done():
|
|
case <-i.firstFingerprintCh:
|
|
}
|
|
}
|
|
|
|
// run is a long lived goroutine that starts the fingerprinting and stats
|
|
// collection goroutine and then shutsdown the plugin on exit.
|
|
func (i *instanceManager) run() {
|
|
// Dispense once to ensure we are given a valid plugin
|
|
if _, err := i.dispense(); err != nil {
|
|
i.logger.Error("dispensing initial plugin failed", "error", err)
|
|
return
|
|
}
|
|
|
|
// Create a waitgroup to block on shutdown for all created goroutines to
|
|
// exit
|
|
var wg sync.WaitGroup
|
|
|
|
// Start the fingerprinter
|
|
wg.Add(1)
|
|
go func() {
|
|
i.fingerprint()
|
|
wg.Done()
|
|
}()
|
|
|
|
// Start event handler
|
|
wg.Add(1)
|
|
go func() {
|
|
i.handleEvents()
|
|
wg.Done()
|
|
}()
|
|
|
|
// Do a final cleanup
|
|
wg.Wait()
|
|
i.cleanup()
|
|
}
|
|
|
|
// dispense is used to dispense a plugin.
|
|
func (i *instanceManager) dispense() (plugin drivers.DriverPlugin, err error) {
|
|
i.pluginLock.Lock()
|
|
defer i.pluginLock.Unlock()
|
|
|
|
// See if we already have a running instance
|
|
if i.plugin != nil && !i.plugin.Exited() {
|
|
return i.driver, nil
|
|
}
|
|
|
|
var pluginInstance loader.PluginInstance
|
|
|
|
if reattach, ok := i.fetchReattach(); ok {
|
|
// Reattach to existing plugin
|
|
pluginInstance, err = i.loader.Reattach(i.id.Name, i.id.PluginType, reattach)
|
|
} else {
|
|
// Get an instance of the plugin
|
|
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
|
|
}
|
|
if err != nil {
|
|
// Retry as the error just indicates the singleton has exited
|
|
if err == singleton.SingletonPluginExited {
|
|
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
|
|
}
|
|
|
|
// If we still have an error there is a real problem
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to start plugin: %v", err)
|
|
}
|
|
}
|
|
|
|
// Convert to a driver plugin
|
|
driver, ok := pluginInstance.Plugin().(drivers.DriverPlugin)
|
|
if !ok {
|
|
pluginInstance.Kill()
|
|
return nil, fmt.Errorf("plugin loaded does not implement the driver interface")
|
|
}
|
|
|
|
// Store the plugin and driver
|
|
i.plugin = pluginInstance
|
|
i.driver = driver
|
|
|
|
// Store the reattach config
|
|
if c, ok := pluginInstance.ReattachConfig(); ok {
|
|
if err := i.storeReattach(c); err != nil {
|
|
i.logger.Error("error storing driver plugin reattach config", "error", err)
|
|
}
|
|
}
|
|
|
|
return driver, nil
|
|
}
|
|
|
|
// cleanup shutsdown the plugin
|
|
func (i *instanceManager) cleanup() {
|
|
i.shutdownLock.Lock()
|
|
i.pluginLock.Lock()
|
|
defer i.pluginLock.Unlock()
|
|
defer i.shutdownLock.Unlock()
|
|
|
|
if i.plugin == nil {
|
|
return
|
|
}
|
|
|
|
if internalPlugin, ok := i.plugin.Plugin().(drivers.InternalDriverPlugin); ok {
|
|
internalPlugin.Shutdown()
|
|
}
|
|
|
|
if !i.plugin.Exited() {
|
|
i.plugin.Kill()
|
|
if err := i.storeReattach(nil); err != nil {
|
|
i.logger.Warn("error clearing plugin reattach config from state store", "error", err)
|
|
}
|
|
}
|
|
|
|
i.cancel()
|
|
}
|
|
|
|
// dispenseFingerprintCh dispenses a driver and makes a Fingerprint RPC call
|
|
// to the driver. The fingerprint chan is returned along with the cancel func
|
|
// for the context used in the RPC. This cancel func should always be called
|
|
// when the caller is finished with the channel.
|
|
func (i *instanceManager) dispenseFingerprintCh() (<-chan *drivers.Fingerprint, context.CancelFunc, error) {
|
|
driver, err := i.dispense()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(i.ctx)
|
|
fingerCh, err := driver.Fingerprint(ctx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, nil, err
|
|
}
|
|
|
|
return fingerCh, cancel, nil
|
|
}
|
|
|
|
// fingerprint is the main loop for fingerprinting.
|
|
func (i *instanceManager) fingerprint() {
|
|
fpChan, cancel, err := i.dispenseFingerprintCh()
|
|
if err != nil {
|
|
i.logger.Error("failed to dispense driver plugin", "error", err)
|
|
}
|
|
|
|
// backoff and retry used if the RPC is closed by the other end
|
|
var backoff time.Duration
|
|
var retry int
|
|
for {
|
|
if backoff > 0 {
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
case fp, ok := <-fpChan:
|
|
if ok {
|
|
if fp.Err == nil {
|
|
i.handleFingerprint(fp)
|
|
} else {
|
|
i.logger.Warn("received fingerprint error from driver", "error", fp.Err)
|
|
i.handleFingerprintError()
|
|
}
|
|
continue
|
|
}
|
|
|
|
// if the channel is closed attempt to open a new one
|
|
newFpChan, newCancel, err := i.dispenseFingerprintCh()
|
|
if err != nil {
|
|
i.logger.Warn("error fingerprinting driver", "error", err, "retry", retry)
|
|
i.handleFingerprintError()
|
|
|
|
// Calculate the new backoff
|
|
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
|
if backoff > driverFPBackoffLimit {
|
|
backoff = driverFPBackoffLimit
|
|
}
|
|
// Increment retry counter
|
|
retry++
|
|
continue
|
|
}
|
|
cancel()
|
|
fpChan = newFpChan
|
|
cancel = newCancel
|
|
|
|
// Reset backoff
|
|
backoff = 0
|
|
retry = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleFingerprintError is called when an error occurred while fingerprinting
|
|
// and will set the driver to unhealthy
|
|
func (i *instanceManager) handleFingerprintError() {
|
|
di := &structs.DriverInfo{
|
|
Healthy: false,
|
|
HealthDescription: "failed to fingerprint driver",
|
|
UpdateTime: time.Now(),
|
|
}
|
|
i.updateNodeFromDriver(i.id.Name, di)
|
|
}
|
|
|
|
// handleFingerprint updates the node with the current fingerprint status
|
|
func (i *instanceManager) handleFingerprint(fp *drivers.Fingerprint) {
|
|
attrs := make(map[string]string, len(fp.Attributes))
|
|
for key, attr := range fp.Attributes {
|
|
attrs[key] = attr.GoString()
|
|
}
|
|
di := &structs.DriverInfo{
|
|
Attributes: attrs,
|
|
Detected: fp.Health != drivers.HealthStateUndetected,
|
|
Healthy: fp.Health == drivers.HealthStateHealthy,
|
|
HealthDescription: fp.HealthDescription,
|
|
UpdateTime: time.Now(),
|
|
}
|
|
i.updateNodeFromDriver(i.id.Name, di)
|
|
|
|
// log detected/undetected state changes after the initial fingerprint
|
|
i.lastHealthStateMu.Lock()
|
|
if i.hasFingerprinted {
|
|
if i.lastHealthState != fp.Health {
|
|
i.logger.Info("driver health state has changed", "previous", i.lastHealthState, "current", fp.Health, "description", fp.HealthDescription)
|
|
}
|
|
}
|
|
i.lastHealthState = fp.Health
|
|
i.lastHealthStateMu.Unlock()
|
|
|
|
// if this is the first fingerprint, mark that we have received it
|
|
if !i.hasFingerprinted {
|
|
i.logger.Trace("initial driver fingerprint", "fingerprint", fp)
|
|
close(i.firstFingerprintCh)
|
|
i.hasFingerprinted = true
|
|
}
|
|
}
|
|
|
|
// getLastHealth returns the most recent HealthState from fingerprinting
|
|
func (i *instanceManager) getLastHealth() drivers.HealthState {
|
|
i.lastHealthStateMu.Lock()
|
|
defer i.lastHealthStateMu.Unlock()
|
|
return i.lastHealthState
|
|
}
|
|
|
|
// dispenseTaskEventsCh dispenses a driver plugin and makes a TaskEvents RPC.
|
|
// The TaskEvent chan and cancel func for the RPC is return. The cancel func must
|
|
// be called by the caller to properly cleanup the context
|
|
func (i *instanceManager) dispenseTaskEventsCh() (<-chan *drivers.TaskEvent, context.CancelFunc, error) {
|
|
driver, err := i.dispense()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(i.ctx)
|
|
eventsCh, err := driver.TaskEvents(ctx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, nil, err
|
|
}
|
|
|
|
return eventsCh, cancel, nil
|
|
}
|
|
|
|
// handleEvents is the main loop that receives task events from the driver
|
|
func (i *instanceManager) handleEvents() {
|
|
eventsCh, cancel, err := i.dispenseTaskEventsCh()
|
|
if err != nil {
|
|
i.logger.Error("failed to dispense driver", "error", err)
|
|
}
|
|
|
|
var backoff time.Duration
|
|
var retry int
|
|
for {
|
|
if backoff > 0 {
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
case ev, ok := <-eventsCh:
|
|
if ok {
|
|
i.handleEvent(ev)
|
|
continue
|
|
}
|
|
|
|
// if the channel is closed attempt to open a new one
|
|
newEventsChan, newCancel, err := i.dispenseTaskEventsCh()
|
|
if err != nil {
|
|
i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry)
|
|
|
|
// Calculate the new backoff
|
|
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
|
if backoff > driverFPBackoffLimit {
|
|
backoff = driverFPBackoffLimit
|
|
}
|
|
retry++
|
|
continue
|
|
}
|
|
cancel()
|
|
eventsCh = newEventsChan
|
|
cancel = newCancel
|
|
|
|
// Reset backoff
|
|
backoff = 0
|
|
retry = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEvent looks up the event handler(s) for the event and runs them
|
|
func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) {
|
|
// Do not emit that the plugin is shutdown
|
|
if ev.Err != nil && ev.Err == bstructs.ErrPluginShutdown {
|
|
return
|
|
}
|
|
|
|
if handler := i.eventHandlerFactory(ev.AllocID, ev.TaskName); handler != nil {
|
|
i.logger.Trace("task event received", "event", ev)
|
|
handler(ev)
|
|
return
|
|
}
|
|
|
|
i.logger.Warn("no handler registered for event", "event", ev)
|
|
}
|