open-nomad/client/pluginmanager/drivermanager/instance.go

463 lines
13 KiB
Go
Raw Normal View History

package drivermanager
import (
"context"
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared/loader"
"github.com/hashicorp/nomad/plugins/shared/singleton"
)
const (
// driverFPBackoffBaseline is the baseline time for exponential backoff while
// fingerprinting a driver.
driverFPBackoffBaseline = 5 * time.Second
// driverFPBackoffLimit is the limit of the exponential backoff for fingerprinting
// a driver.
driverFPBackoffLimit = 2 * time.Minute
)
// instanceManagerConfig configures a driver instance manager
type instanceManagerConfig struct {
// Logger is the logger used by the driver instance manager
Logger log.Logger
// Ctx is used to shutdown the driver instance manager
Ctx context.Context
// Loader is the plugin loader
Loader loader.PluginCatalog
// StoreReattach is used to store a plugins reattach config
StoreReattach StorePluginReattachFn
// FetchReattach is used to retrieve a plugin's reattach config
FetchReattach FetchPluginReattachFn
// PluginConfig is the config passed to the launched plugins
PluginConfig *base.AgentConfig
// ID is the ID of the plugin being managed
ID *loader.PluginID
// updateNodeFromDriver is the callback used to update the node from fingerprinting
UpdateNodeFromDriver UpdateNodeDriverInfoFn
// EventHandlerFactory is used to fetch a task event handler
EventHandlerFactory TaskEventHandlerFactory
}
// instanceManager is used to manage a single driver plugin
type instanceManager struct {
// logger is the logger used by the driver instance manager
logger log.Logger
// ctx is used to shutdown the driver manager
ctx context.Context
// cancel is used to shutdown management of this driver plugin
cancel context.CancelFunc
// loader is the plugin loader
loader loader.PluginCatalog
// storeReattach is used to store a plugins reattach config
storeReattach StorePluginReattachFn
// fetchReattach is used to retrieve a plugin's reattach config
fetchReattach FetchPluginReattachFn
// pluginConfig is the config passed to the launched plugins
pluginConfig *base.AgentConfig
// id is the ID of the plugin being managed
id *loader.PluginID
// plugin is the plugin instance being managed
plugin loader.PluginInstance
// driver is the driver plugin being managed
driver drivers.DriverPlugin
// pluginLock locks access to the driver and plugin
pluginLock sync.Mutex
// shutdownLock is used to serialize attempts to shutdown
shutdownLock sync.Mutex
// updateNodeFromDriver is the callback used to update the node from fingerprinting
updateNodeFromDriver UpdateNodeDriverInfoFn
// eventHandlerFactory is used to fetch a handler for a task event
eventHandlerFactory TaskEventHandlerFactory
// firstFingerprintCh is used to trigger that we have successfully
// fingerprinted once. It is used to gate launching the stats collection.
firstFingerprintCh chan struct{}
hasFingerprinted bool
// lastHealthState is the last known health fingerprinted by the manager
lastHealthState drivers.HealthState
lastHealthStateMu sync.Mutex
}
// newInstanceManager returns a new driver instance manager. It is expected that
// the context passed in the configuration is cancelled in order to shutdown
// launched goroutines.
func newInstanceManager(c *instanceManagerConfig) *instanceManager {
ctx, cancel := context.WithCancel(c.Ctx)
i := &instanceManager{
logger: c.Logger.With("driver", c.ID.Name),
ctx: ctx,
cancel: cancel,
loader: c.Loader,
storeReattach: c.StoreReattach,
fetchReattach: c.FetchReattach,
pluginConfig: c.PluginConfig,
id: c.ID,
updateNodeFromDriver: c.UpdateNodeFromDriver,
eventHandlerFactory: c.EventHandlerFactory,
firstFingerprintCh: make(chan struct{}),
}
go i.run()
return i
}
// WaitForFirstFingerprint waits until either the plugin fingerprints, the
// passed context is done, or the plugin instance manager is shutdown.
func (i *instanceManager) WaitForFirstFingerprint(ctx context.Context) {
select {
case <-i.ctx.Done():
case <-ctx.Done():
case <-i.firstFingerprintCh:
}
}
// run is a long lived goroutine that starts the fingerprinting and stats
// collection goroutine and then shutsdown the plugin on exit.
func (i *instanceManager) run() {
// Dispense once to ensure we are given a valid plugin
if _, err := i.dispense(); err != nil {
i.logger.Error("dispensing initial plugin failed", "error", err)
return
}
// Create a waitgroup to block on shutdown for all created goroutines to
// exit
var wg sync.WaitGroup
// Start the fingerprinter
wg.Add(1)
go func() {
i.fingerprint()
wg.Done()
}()
// Start event handler
wg.Add(1)
go func() {
i.handleEvents()
wg.Done()
}()
// Do a final cleanup
wg.Wait()
i.cleanup()
}
// dispense is used to dispense a plugin.
func (i *instanceManager) dispense() (plugin drivers.DriverPlugin, err error) {
i.pluginLock.Lock()
defer i.pluginLock.Unlock()
// See if we already have a running instance
if i.plugin != nil && !i.plugin.Exited() {
return i.driver, nil
}
var pluginInstance loader.PluginInstance
if reattach, ok := i.fetchReattach(); ok {
// Reattach to existing plugin
pluginInstance, err = i.loader.Reattach(i.id.Name, i.id.PluginType, reattach)
} else {
// Get an instance of the plugin
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
}
if err != nil {
// Retry as the error just indicates the singleton has exited
if err == singleton.SingletonPluginExited {
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
}
// If we still have an error there is a real problem
if err != nil {
return nil, fmt.Errorf("failed to start plugin: %v", err)
}
}
// Convert to a driver plugin
driver, ok := pluginInstance.Plugin().(drivers.DriverPlugin)
if !ok {
pluginInstance.Kill()
return nil, fmt.Errorf("plugin loaded does not implement the driver interface")
}
// Store the plugin and driver
i.plugin = pluginInstance
i.driver = driver
// Store the reattach config
if c, ok := pluginInstance.ReattachConfig(); ok {
if err := i.storeReattach(c); err != nil {
i.logger.Error("error storing driver plugin reattach config", "error", err)
}
}
return driver, nil
}
// cleanup shutsdown the plugin
func (i *instanceManager) cleanup() {
i.shutdownLock.Lock()
i.pluginLock.Lock()
defer i.pluginLock.Unlock()
defer i.shutdownLock.Unlock()
if i.plugin != nil && !i.plugin.Exited() {
i.plugin.Kill()
if err := i.storeReattach(nil); err != nil {
i.logger.Warn("error clearing plugin reattach config from state store", "error", err)
}
}
}
// dispenseFingerprintCh dispenses a driver and makes a Fingerprint RPC call
// to the driver. The fingerprint chan is returned along with the cancel func
// for the context used in the RPC. This cancel func should always be called
// when the caller is finished with the channel.
func (i *instanceManager) dispenseFingerprintCh() (<-chan *drivers.Fingerprint, context.CancelFunc, error) {
driver, err := i.dispense()
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithCancel(i.ctx)
fingerCh, err := driver.Fingerprint(ctx)
if err != nil {
cancel()
return nil, nil, err
}
return fingerCh, cancel, nil
}
// fingerprint is the main loop for fingerprinting.
func (i *instanceManager) fingerprint() {
fpChan, cancel, err := i.dispenseFingerprintCh()
if err != nil {
i.logger.Error("failed to dispense driver plugin", "error", err)
}
// backoff and retry used if the RPC is closed by the other end
var backoff time.Duration
var retry int
for {
if backoff > 0 {
select {
case <-time.After(backoff):
case <-i.ctx.Done():
cancel()
return
}
}
select {
case <-i.ctx.Done():
cancel()
return
case fp, ok := <-fpChan:
if ok {
if fp.Err == nil {
i.handleFingerprint(fp)
} else {
i.logger.Warn("received fingerprint error from driver", "error", fp.Err)
i.handleFingerprintError()
}
continue
}
// if the channel is closed attempt to open a new one
newFpChan, newCancel, err := i.dispenseFingerprintCh()
if err != nil {
i.logger.Warn("error fingerprinting driver", "error", err, "retry", retry)
i.handleFingerprintError()
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
if backoff > driverFPBackoffLimit {
backoff = driverFPBackoffLimit
}
// Increment retry counter
retry++
continue
}
cancel()
fpChan = newFpChan
cancel = newCancel
// Reset backoff
backoff = 0
retry = 0
}
}
}
// handleFingerprintError is called when an error occurred while fingerprinting
// and will set the driver to unhealthy
func (i *instanceManager) handleFingerprintError() {
di := &structs.DriverInfo{
Healthy: false,
HealthDescription: "failed to fingerprint driver",
UpdateTime: time.Now(),
}
i.updateNodeFromDriver(i.id.Name, di)
}
// handleFingerprint updates the node with the current fingerprint status
func (i *instanceManager) handleFingerprint(fp *drivers.Fingerprint) {
attrs := make(map[string]string, len(fp.Attributes))
for key, attr := range fp.Attributes {
attrs[key] = attr.GoString()
}
di := &structs.DriverInfo{
Attributes: attrs,
Detected: fp.Health != drivers.HealthStateUndetected,
Healthy: fp.Health == drivers.HealthStateHealthy,
HealthDescription: fp.HealthDescription,
UpdateTime: time.Now(),
}
i.updateNodeFromDriver(i.id.Name, di)
// log detected/undetected state changes after the initial fingerprint
i.lastHealthStateMu.Lock()
if i.hasFingerprinted {
if i.lastHealthState != fp.Health {
i.logger.Info("driver health state has changed", "previous", i.lastHealthState, "current", fp.Health, "description", fp.HealthDescription)
}
}
i.lastHealthState = fp.Health
i.lastHealthStateMu.Unlock()
// if this is the first fingerprint, mark that we have received it
if !i.hasFingerprinted {
i.logger.Trace("initial driver fingerprint", "fingerprint", fp)
close(i.firstFingerprintCh)
i.hasFingerprinted = true
}
}
// getLastHealth returns the most recent HealthState from fingerprinting
func (i *instanceManager) getLastHealth() drivers.HealthState {
i.lastHealthStateMu.Lock()
defer i.lastHealthStateMu.Unlock()
return i.lastHealthState
}
// dispenseTaskEventsCh dispenses a driver plugin and makes a TaskEvents RPC.
// The TaskEvent chan and cancel func for the RPC is return. The cancel func must
// be called by the caller to properly cleanup the context
func (i *instanceManager) dispenseTaskEventsCh() (<-chan *drivers.TaskEvent, context.CancelFunc, error) {
driver, err := i.dispense()
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithCancel(i.ctx)
eventsCh, err := driver.TaskEvents(ctx)
if err != nil {
cancel()
return nil, nil, err
}
return eventsCh, cancel, nil
}
// handleEvents is the main loop that receives task events from the driver
func (i *instanceManager) handleEvents() {
eventsCh, cancel, err := i.dispenseTaskEventsCh()
if err != nil {
i.logger.Error("failed to dispense driver", "error", err)
}
var backoff time.Duration
var retry int
for {
if backoff > 0 {
select {
case <-time.After(backoff):
case <-i.ctx.Done():
cancel()
return
}
}
select {
case <-i.ctx.Done():
cancel()
return
case ev, ok := <-eventsCh:
if ok {
i.handleEvent(ev)
continue
}
// if the channel is closed attempt to open a new one
newEventsChan, newCancel, err := i.dispenseTaskEventsCh()
if err != nil {
i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry)
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
if backoff > driverFPBackoffLimit {
backoff = driverFPBackoffLimit
}
retry++
continue
}
cancel()
eventsCh = newEventsChan
cancel = newCancel
// Reset backoff
backoff = 0
retry = 0
}
}
}
// handleEvent looks up the event handler(s) for the event and runs them
func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) {
// Do not emit that the plugin is shutdown
if ev.Err != nil && ev.Err == base.ErrPluginShutdown {
return
}
if handler := i.eventHandlerFactory(ev.AllocID, ev.TaskName); handler != nil {
i.logger.Trace("task event received", "event", ev)
handler(ev)
return
}
i.logger.Warn("no handler registered for event", "event", ev)
}