464 lines
13 KiB
Go
464 lines
13 KiB
Go
package drivermanager
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/base"
|
|
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/hashicorp/nomad/plugins/shared/loader"
|
|
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
|
)
|
|
|
|
const (
|
|
// driverFPBackoffBaseline is the baseline time for exponential backoff while
|
|
// fingerprinting a driver.
|
|
driverFPBackoffBaseline = 5 * time.Second
|
|
|
|
// driverFPBackoffLimit is the limit of the exponential backoff for fingerprinting
|
|
// a driver.
|
|
driverFPBackoffLimit = 2 * time.Minute
|
|
)
|
|
|
|
// instanceManagerConfig configures a driver instance manager
|
|
type instanceManagerConfig struct {
|
|
// Logger is the logger used by the driver instance manager
|
|
Logger log.Logger
|
|
|
|
// Ctx is used to shutdown the driver instance manager
|
|
Ctx context.Context
|
|
|
|
// Loader is the plugin loader
|
|
Loader loader.PluginCatalog
|
|
|
|
// StoreReattach is used to store a plugins reattach config
|
|
StoreReattach StorePluginReattachFn
|
|
|
|
// FetchReattach is used to retrieve a plugin's reattach config
|
|
FetchReattach FetchPluginReattachFn
|
|
|
|
// PluginConfig is the config passed to the launched plugins
|
|
PluginConfig *base.AgentConfig
|
|
|
|
// ID is the ID of the plugin being managed
|
|
ID *loader.PluginID
|
|
|
|
// updateNodeFromDriver is the callback used to update the node from fingerprinting
|
|
UpdateNodeFromDriver UpdateNodeDriverInfoFn
|
|
|
|
// EventHandlerFactory is used to fetch a task event handler
|
|
EventHandlerFactory TaskEventHandlerFactory
|
|
}
|
|
|
|
// instanceManager is used to manage a single driver plugin
|
|
type instanceManager struct {
|
|
// logger is the logger used by the driver instance manager
|
|
logger log.Logger
|
|
|
|
// ctx is used to shutdown the driver manager
|
|
ctx context.Context
|
|
|
|
// cancel is used to shutdown management of this driver plugin
|
|
cancel context.CancelFunc
|
|
|
|
// loader is the plugin loader
|
|
loader loader.PluginCatalog
|
|
|
|
// storeReattach is used to store a plugins reattach config
|
|
storeReattach StorePluginReattachFn
|
|
|
|
// fetchReattach is used to retrieve a plugin's reattach config
|
|
fetchReattach FetchPluginReattachFn
|
|
|
|
// pluginConfig is the config passed to the launched plugins
|
|
pluginConfig *base.AgentConfig
|
|
|
|
// id is the ID of the plugin being managed
|
|
id *loader.PluginID
|
|
|
|
// plugin is the plugin instance being managed
|
|
plugin loader.PluginInstance
|
|
|
|
// driver is the driver plugin being managed
|
|
driver drivers.DriverPlugin
|
|
|
|
// pluginLock locks access to the driver and plugin
|
|
pluginLock sync.Mutex
|
|
|
|
// shutdownLock is used to serialize attempts to shutdown
|
|
shutdownLock sync.Mutex
|
|
|
|
// updateNodeFromDriver is the callback used to update the node from fingerprinting
|
|
updateNodeFromDriver UpdateNodeDriverInfoFn
|
|
|
|
// eventHandlerFactory is used to fetch a handler for a task event
|
|
eventHandlerFactory TaskEventHandlerFactory
|
|
|
|
// firstFingerprintCh is used to trigger that we have successfully
|
|
// fingerprinted once. It is used to gate launching the stats collection.
|
|
firstFingerprintCh chan struct{}
|
|
hasFingerprinted bool
|
|
|
|
// lastHealthState is the last known health fingerprinted by the manager
|
|
lastHealthState drivers.HealthState
|
|
lastHealthStateMu sync.Mutex
|
|
}
|
|
|
|
// newInstanceManager returns a new driver instance manager. It is expected that
|
|
// the context passed in the configuration is cancelled in order to shutdown
|
|
// launched goroutines.
|
|
func newInstanceManager(c *instanceManagerConfig) *instanceManager {
|
|
|
|
ctx, cancel := context.WithCancel(c.Ctx)
|
|
i := &instanceManager{
|
|
logger: c.Logger.With("driver", c.ID.Name),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
loader: c.Loader,
|
|
storeReattach: c.StoreReattach,
|
|
fetchReattach: c.FetchReattach,
|
|
pluginConfig: c.PluginConfig,
|
|
id: c.ID,
|
|
updateNodeFromDriver: c.UpdateNodeFromDriver,
|
|
eventHandlerFactory: c.EventHandlerFactory,
|
|
firstFingerprintCh: make(chan struct{}),
|
|
}
|
|
|
|
go i.run()
|
|
return i
|
|
}
|
|
|
|
// WaitForFirstFingerprint waits until either the plugin fingerprints, the
|
|
// passed context is done, or the plugin instance manager is shutdown.
|
|
func (i *instanceManager) WaitForFirstFingerprint(ctx context.Context) {
|
|
select {
|
|
case <-i.ctx.Done():
|
|
case <-ctx.Done():
|
|
case <-i.firstFingerprintCh:
|
|
}
|
|
}
|
|
|
|
// run is a long lived goroutine that starts the fingerprinting and stats
|
|
// collection goroutine and then shutsdown the plugin on exit.
|
|
func (i *instanceManager) run() {
|
|
// Dispense once to ensure we are given a valid plugin
|
|
if _, err := i.dispense(); err != nil {
|
|
i.logger.Error("dispensing initial plugin failed", "error", err)
|
|
return
|
|
}
|
|
|
|
// Create a waitgroup to block on shutdown for all created goroutines to
|
|
// exit
|
|
var wg sync.WaitGroup
|
|
|
|
// Start the fingerprinter
|
|
wg.Add(1)
|
|
go func() {
|
|
i.fingerprint()
|
|
wg.Done()
|
|
}()
|
|
|
|
// Start event handler
|
|
wg.Add(1)
|
|
go func() {
|
|
i.handleEvents()
|
|
wg.Done()
|
|
}()
|
|
|
|
// Do a final cleanup
|
|
wg.Wait()
|
|
i.cleanup()
|
|
}
|
|
|
|
// dispense is used to dispense a plugin.
|
|
func (i *instanceManager) dispense() (plugin drivers.DriverPlugin, err error) {
|
|
i.pluginLock.Lock()
|
|
defer i.pluginLock.Unlock()
|
|
|
|
// See if we already have a running instance
|
|
if i.plugin != nil && !i.plugin.Exited() {
|
|
return i.driver, nil
|
|
}
|
|
|
|
var pluginInstance loader.PluginInstance
|
|
|
|
if reattach, ok := i.fetchReattach(); ok {
|
|
// Reattach to existing plugin
|
|
pluginInstance, err = i.loader.Reattach(i.id.Name, i.id.PluginType, reattach)
|
|
} else {
|
|
// Get an instance of the plugin
|
|
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
|
|
}
|
|
if err != nil {
|
|
// Retry as the error just indicates the singleton has exited
|
|
if err == singleton.SingletonPluginExited {
|
|
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
|
|
}
|
|
|
|
// If we still have an error there is a real problem
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to start plugin: %v", err)
|
|
}
|
|
}
|
|
|
|
// Convert to a driver plugin
|
|
driver, ok := pluginInstance.Plugin().(drivers.DriverPlugin)
|
|
if !ok {
|
|
pluginInstance.Kill()
|
|
return nil, fmt.Errorf("plugin loaded does not implement the driver interface")
|
|
}
|
|
|
|
// Store the plugin and driver
|
|
i.plugin = pluginInstance
|
|
i.driver = driver
|
|
|
|
// Store the reattach config
|
|
if c, ok := pluginInstance.ReattachConfig(); ok {
|
|
if err := i.storeReattach(c); err != nil {
|
|
i.logger.Error("error storing driver plugin reattach config", "error", err)
|
|
}
|
|
}
|
|
|
|
return driver, nil
|
|
}
|
|
|
|
// cleanup shutsdown the plugin
|
|
func (i *instanceManager) cleanup() {
|
|
i.shutdownLock.Lock()
|
|
i.pluginLock.Lock()
|
|
defer i.pluginLock.Unlock()
|
|
defer i.shutdownLock.Unlock()
|
|
|
|
if i.plugin != nil && !i.plugin.Exited() {
|
|
i.plugin.Kill()
|
|
if err := i.storeReattach(nil); err != nil {
|
|
i.logger.Warn("error clearing plugin reattach config from state store", "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// dispenseFingerprintCh dispenses a driver and makes a Fingerprint RPC call
|
|
// to the driver. The fingerprint chan is returned along with the cancel func
|
|
// for the context used in the RPC. This cancel func should always be called
|
|
// when the caller is finished with the channel.
|
|
func (i *instanceManager) dispenseFingerprintCh() (<-chan *drivers.Fingerprint, context.CancelFunc, error) {
|
|
driver, err := i.dispense()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(i.ctx)
|
|
fingerCh, err := driver.Fingerprint(ctx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, nil, err
|
|
}
|
|
|
|
return fingerCh, cancel, nil
|
|
}
|
|
|
|
// fingerprint is the main loop for fingerprinting.
|
|
func (i *instanceManager) fingerprint() {
|
|
fpChan, cancel, err := i.dispenseFingerprintCh()
|
|
if err != nil {
|
|
i.logger.Error("failed to dispense driver plugin", "error", err)
|
|
}
|
|
|
|
// backoff and retry used if the RPC is closed by the other end
|
|
var backoff time.Duration
|
|
var retry int
|
|
for {
|
|
if backoff > 0 {
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
case fp, ok := <-fpChan:
|
|
if ok {
|
|
if fp.Err == nil {
|
|
i.handleFingerprint(fp)
|
|
} else {
|
|
i.logger.Warn("received fingerprint error from driver", "error", fp.Err)
|
|
i.handleFingerprintError()
|
|
}
|
|
continue
|
|
}
|
|
|
|
// if the channel is closed attempt to open a new one
|
|
newFpChan, newCancel, err := i.dispenseFingerprintCh()
|
|
if err != nil {
|
|
i.logger.Warn("error fingerprinting driver", "error", err, "retry", retry)
|
|
i.handleFingerprintError()
|
|
|
|
// Calculate the new backoff
|
|
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
|
if backoff > driverFPBackoffLimit {
|
|
backoff = driverFPBackoffLimit
|
|
}
|
|
// Increment retry counter
|
|
retry++
|
|
continue
|
|
}
|
|
cancel()
|
|
fpChan = newFpChan
|
|
cancel = newCancel
|
|
|
|
// Reset backoff
|
|
backoff = 0
|
|
retry = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleFingerprintError is called when an error occurred while fingerprinting
|
|
// and will set the driver to unhealthy
|
|
func (i *instanceManager) handleFingerprintError() {
|
|
di := &structs.DriverInfo{
|
|
Healthy: false,
|
|
HealthDescription: "failed to fingerprint driver",
|
|
UpdateTime: time.Now(),
|
|
}
|
|
i.updateNodeFromDriver(i.id.Name, di)
|
|
}
|
|
|
|
// handleFingerprint updates the node with the current fingerprint status
|
|
func (i *instanceManager) handleFingerprint(fp *drivers.Fingerprint) {
|
|
attrs := make(map[string]string, len(fp.Attributes))
|
|
for key, attr := range fp.Attributes {
|
|
attrs[key] = attr.GoString()
|
|
}
|
|
di := &structs.DriverInfo{
|
|
Attributes: attrs,
|
|
Detected: fp.Health != drivers.HealthStateUndetected,
|
|
Healthy: fp.Health == drivers.HealthStateHealthy,
|
|
HealthDescription: fp.HealthDescription,
|
|
UpdateTime: time.Now(),
|
|
}
|
|
i.updateNodeFromDriver(i.id.Name, di)
|
|
|
|
// log detected/undetected state changes after the initial fingerprint
|
|
i.lastHealthStateMu.Lock()
|
|
if i.hasFingerprinted {
|
|
if i.lastHealthState != fp.Health {
|
|
i.logger.Info("driver health state has changed", "previous", i.lastHealthState, "current", fp.Health, "description", fp.HealthDescription)
|
|
}
|
|
}
|
|
i.lastHealthState = fp.Health
|
|
i.lastHealthStateMu.Unlock()
|
|
|
|
// if this is the first fingerprint, mark that we have received it
|
|
if !i.hasFingerprinted {
|
|
i.logger.Trace("initial driver fingerprint", "fingerprint", fp)
|
|
close(i.firstFingerprintCh)
|
|
i.hasFingerprinted = true
|
|
}
|
|
}
|
|
|
|
// getLastHealth returns the most recent HealthState from fingerprinting
|
|
func (i *instanceManager) getLastHealth() drivers.HealthState {
|
|
i.lastHealthStateMu.Lock()
|
|
defer i.lastHealthStateMu.Unlock()
|
|
return i.lastHealthState
|
|
}
|
|
|
|
// dispenseTaskEventsCh dispenses a driver plugin and makes a TaskEvents RPC.
|
|
// The TaskEvent chan and cancel func for the RPC is return. The cancel func must
|
|
// be called by the caller to properly cleanup the context
|
|
func (i *instanceManager) dispenseTaskEventsCh() (<-chan *drivers.TaskEvent, context.CancelFunc, error) {
|
|
driver, err := i.dispense()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(i.ctx)
|
|
eventsCh, err := driver.TaskEvents(ctx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, nil, err
|
|
}
|
|
|
|
return eventsCh, cancel, nil
|
|
}
|
|
|
|
// handleEvents is the main loop that receives task events from the driver
|
|
func (i *instanceManager) handleEvents() {
|
|
eventsCh, cancel, err := i.dispenseTaskEventsCh()
|
|
if err != nil {
|
|
i.logger.Error("failed to dispense driver", "error", err)
|
|
}
|
|
|
|
var backoff time.Duration
|
|
var retry int
|
|
for {
|
|
if backoff > 0 {
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-i.ctx.Done():
|
|
cancel()
|
|
return
|
|
case ev, ok := <-eventsCh:
|
|
if ok {
|
|
i.handleEvent(ev)
|
|
continue
|
|
}
|
|
|
|
// if the channel is closed attempt to open a new one
|
|
newEventsChan, newCancel, err := i.dispenseTaskEventsCh()
|
|
if err != nil {
|
|
i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry)
|
|
|
|
// Calculate the new backoff
|
|
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
|
if backoff > driverFPBackoffLimit {
|
|
backoff = driverFPBackoffLimit
|
|
}
|
|
retry++
|
|
continue
|
|
}
|
|
cancel()
|
|
eventsCh = newEventsChan
|
|
cancel = newCancel
|
|
|
|
// Reset backoff
|
|
backoff = 0
|
|
retry = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEvent looks up the event handler(s) for the event and runs them
|
|
func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) {
|
|
// Do not emit that the plugin is shutdown
|
|
if ev.Err != nil && ev.Err == bstructs.ErrPluginShutdown {
|
|
return
|
|
}
|
|
|
|
if handler := i.eventHandlerFactory(ev.AllocID, ev.TaskName); handler != nil {
|
|
i.logger.Trace("task event received", "event", ev)
|
|
handler(ev)
|
|
return
|
|
}
|
|
|
|
i.logger.Warn("no handler registered for event", "event", ev)
|
|
}
|