open-nomad/client/devicemanager/instance.go

529 lines
13 KiB
Go

package devicemanager
import (
"context"
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/device"
)
const (
// statsBackoffBaseline is the baseline time for exponential backoff while
// collecting device stats.
statsBackoffBaseline = 5 * time.Second
// statsBackoffLimit is the limit of the exponential backoff for collecting
// device statistics.
statsBackoffLimit = 30 * time.Minute
)
// instanceManagerConfig configures a device instance manager
type instanceManagerConfig struct {
// Logger is the logger used by the device instance manager
Logger log.Logger
// Ctx is used to shutdown the device instance manager
Ctx context.Context
// Loader is the plugin loader
Loader loader.PluginCatalog
// StoreReattach is used to store a plugins reattach config
StoreReattach StorePluginReattachFn
// PluginConfig is the config passed to the launched plugins
PluginConfig *base.AgentConfig
// Id is the ID of the plugin being managed
Id *loader.PluginID
// FingerprintOutCh is used to emit new fingerprinted devices
FingerprintOutCh chan<- struct{}
// StatsInterval is the interval at which we collect statistics.
StatsInterval time.Duration
}
// instanceManager is used to manage a single device plugin
type instanceManager struct {
// logger is the logger used by the device instance manager
logger log.Logger
// ctx is used to shutdown the device manager
ctx context.Context
// cancel is used to shutdown management of this device plugin
cancel context.CancelFunc
// loader is the plugin loader
loader loader.PluginCatalog
// storeReattach is used to store a plugins reattach config
storeReattach StorePluginReattachFn
// pluginConfig is the config passed to the launched plugins
pluginConfig *base.AgentConfig
// id is the ID of the plugin being managed
id *loader.PluginID
// fingerprintOutCh is used to emit new fingerprinted devices
fingerprintOutCh chan<- struct{}
// plugin is the plugin instance being managed
plugin loader.PluginInstance
// device is the device plugin being managed
device device.DevicePlugin
// pluginLock locks access to the device and plugin
pluginLock sync.Mutex
// shutdownLock is used to serialize attempts to shutdown
shutdownLock sync.Mutex
// devices is the set of fingerprinted devices
devices []*device.DeviceGroup
deviceLock sync.RWMutex
// statsInterval is the interval at which we collect statistics.
statsInterval time.Duration
// deviceStats is the set of statistics objects per devices
deviceStats []*device.DeviceGroupStats
deviceStatsLock sync.RWMutex
// firstFingerprintCh is used to trigger that we have successfully
// fingerprinted once. It is used to gate launching the stats collection.
firstFingerprintCh chan struct{}
hasFingerprinted bool
}
// newInstanceManager returns a new device instance manager. It is expected that
// the context passed in the configuration is cancelled in order to shutdown
// launched goroutines.
func newInstanceManager(c *instanceManagerConfig) *instanceManager {
ctx, cancel := context.WithCancel(c.Ctx)
i := &instanceManager{
logger: c.Logger.With("plugin", c.Id.Name),
ctx: ctx,
cancel: cancel,
loader: c.Loader,
storeReattach: c.StoreReattach,
pluginConfig: c.PluginConfig,
id: c.Id,
fingerprintOutCh: c.FingerprintOutCh,
statsInterval: c.StatsInterval,
firstFingerprintCh: make(chan struct{}),
}
go i.run()
return i
}
// HasDevices returns if the instance is managing the passed devices
func (i *instanceManager) HasDevices(d *structs.AllocatedDeviceResource) bool {
i.deviceLock.RLock()
defer i.deviceLock.RUnlock()
OUTER:
for _, dev := range i.devices {
if dev.Name != d.Name || dev.Type != d.Type || dev.Vendor != d.Vendor {
continue
}
// Check that we have all the requested devices
ids := make(map[string]struct{}, len(dev.Devices))
for _, inst := range dev.Devices {
ids[inst.ID] = struct{}{}
}
for _, reqID := range d.DeviceIDs {
if _, ok := ids[reqID]; !ok {
continue OUTER
}
}
return true
}
return false
}
// AllStats returns all the device statistics returned by the device plugin.
func (i *instanceManager) AllStats() []*device.DeviceGroupStats {
i.deviceStatsLock.RLock()
defer i.deviceStatsLock.RUnlock()
return i.deviceStats
}
// DeviceStats returns the device statistics for the request devices.
func (i *instanceManager) DeviceStats(d *structs.AllocatedDeviceResource) *device.DeviceGroupStats {
i.deviceStatsLock.RLock()
defer i.deviceStatsLock.RUnlock()
// Find the device in question and then gather the instance statistics we
// are interested in
for _, group := range i.deviceStats {
if group.Vendor != d.Vendor || group.Type != d.Type || group.Name != d.Name {
continue
}
// We found the group we want so now grab the instance stats
out := &device.DeviceGroupStats{
Vendor: d.Vendor,
Type: d.Type,
Name: d.Name,
InstanceStats: make(map[string]*device.DeviceStats, len(d.DeviceIDs)),
}
for _, id := range d.DeviceIDs {
out.InstanceStats[id] = group.InstanceStats[id]
}
return out
}
return nil
}
// Reserve reserves the given devices
func (i *instanceManager) Reserve(d *structs.AllocatedDeviceResource) (*device.ContainerReservation, error) {
// Get a device plugin
devicePlugin, err := i.dispense()
if err != nil {
i.logger.Error("dispensing plugin failed", "error", err)
return nil, err
}
// Send the reserve request
return devicePlugin.Reserve(d.DeviceIDs)
}
// Devices returns the detected devices.
func (i *instanceManager) Devices() []*device.DeviceGroup {
i.deviceLock.RLock()
defer i.deviceLock.RUnlock()
return i.devices
}
// WaitForFirstFingerprint waits until either the plugin fingerprints, the
// passed context is done, or the plugin instance manager is shutdown.
func (i *instanceManager) WaitForFirstFingerprint(ctx context.Context) {
select {
case <-i.ctx.Done():
case <-ctx.Done():
case <-i.firstFingerprintCh:
}
}
// run is a long lived goroutine that starts the fingerprinting and stats
// collection goroutine and then shutsdown the plugin on exit.
func (i *instanceManager) run() {
// Dispense once to ensure we are given a valid plugin
if _, err := i.dispense(); err != nil {
i.logger.Error("dispensing initial plugin failed", "error", err)
return
}
// Create a waitgroup to block on shutdown for all created goroutines to
// exit
var wg sync.WaitGroup
// Start the fingerprinter
wg.Add(1)
go func() {
i.fingerprint()
wg.Done()
}()
// Wait for a valid result before starting stats collection
select {
case <-i.ctx.Done():
goto DONE
case <-i.firstFingerprintCh:
}
// Start stats
wg.Add(1)
go func() {
i.collectStats()
wg.Done()
}()
// Do a final cleanup
DONE:
wg.Wait()
i.cleanup()
}
// dispense is used to dispense a plugin.
func (i *instanceManager) dispense() (plugin device.DevicePlugin, err error) {
i.pluginLock.Lock()
defer i.pluginLock.Unlock()
// See if we already have a running instance
if i.plugin != nil && !i.plugin.Exited() {
return i.device, nil
}
// Get an instance of the plugin
pluginInstance, err := i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
if err != nil {
// Retry as the error just indicates the singleton has exited
if err == singleton.SingletonPluginExited {
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
}
// If we still have an error there is a real problem
if err != nil {
return nil, fmt.Errorf("failed to start plugin: %v", err)
}
}
// Convert to a fingerprint plugin
device, ok := pluginInstance.Plugin().(device.DevicePlugin)
if !ok {
pluginInstance.Kill()
return nil, fmt.Errorf("plugin loaded does not implement the driver interface")
}
// Store the plugin and device
i.plugin = pluginInstance
i.device = device
// Store the reattach config
if c, ok := pluginInstance.ReattachConfig(); ok {
i.storeReattach(c)
}
return device, nil
}
// cleanup shutsdown the plugin
func (i *instanceManager) cleanup() {
i.shutdownLock.Lock()
i.pluginLock.Lock()
defer i.pluginLock.Unlock()
defer i.shutdownLock.Unlock()
if i.plugin != nil && !i.plugin.Exited() {
i.plugin.Kill()
i.storeReattach(nil)
}
}
// fingerprint is a long lived routine used to fingerprint the device
func (i *instanceManager) fingerprint() {
START:
// Get a device plugin
devicePlugin, err := i.dispense()
if err != nil {
i.logger.Error("dispensing plugin failed", "error", err)
i.cancel()
return
}
// Start fingerprinting
fingerprintCh, err := devicePlugin.Fingerprint(i.ctx)
if err == device.ErrPluginDisabled {
i.logger.Info("fingerprinting failed: plugin is not enabled")
i.handleFingerprintError()
return
} else if err != nil {
i.logger.Error("fingerprinting failed", "error", err)
i.handleFingerprintError()
return
}
var fresp *device.FingerprintResponse
var ok bool
for {
select {
case <-i.ctx.Done():
return
case fresp, ok = <-fingerprintCh:
}
if !ok {
i.logger.Trace("exiting since fingerprinting gracefully shutdown")
i.handleFingerprintError()
return
}
// Guard against error by the plugin
if fresp == nil {
continue
}
// Handle any errors
if fresp.Error != nil {
if fresp.Error == bstructs.ErrPluginShutdown {
i.logger.Error("plugin exited unexpectedly")
goto START
}
i.logger.Error("fingerprinting returned an error", "error", err)
i.handleFingerprintError()
return
}
if err := i.handleFingerprint(fresp); err != nil {
// Cancel the context so we cleanup all goroutines
i.logger.Error("returned devices failed fingerprinting", "error", err)
i.handleFingerprintError()
}
}
}
// handleFingerprintError exits the manager and shutsdown the plugin.
func (i *instanceManager) handleFingerprintError() {
// Clear out the devices and trigger a node update
i.deviceLock.Lock()
defer i.deviceLock.Unlock()
// If we have fingerprinted before clear it out
if i.hasFingerprinted {
// Store the new devices
i.devices = nil
// Trigger that the we have new devices
select {
case i.fingerprintOutCh <- struct{}{}:
default:
}
}
// Cancel the context so we cleanup all goroutines
i.cancel()
return
}
// handleFingerprint stores the new devices and triggers the fingerprint output
// channel. An error is returned if the passed devices don't pass validation.
func (i *instanceManager) handleFingerprint(f *device.FingerprintResponse) error {
// If no devices are returned then there is nothing to do.
if f.Devices == nil {
return nil
}
// Validate the received devices
var validationErr multierror.Error
for i, d := range f.Devices {
if err := d.Validate(); err != nil {
multierror.Append(&validationErr, multierror.Prefix(err, fmt.Sprintf("device group %d: ", i)))
}
}
if err := validationErr.ErrorOrNil(); err != nil {
return err
}
i.deviceLock.Lock()
defer i.deviceLock.Unlock()
// Store the new devices
i.devices = f.Devices
// Mark that we have received data
if !i.hasFingerprinted {
close(i.firstFingerprintCh)
i.hasFingerprinted = true
}
// Trigger that we have data to pull
select {
case i.fingerprintOutCh <- struct{}{}:
default:
}
return nil
}
// collectStats is a long lived goroutine for collecting device statistics. It
// handles errors by backing off exponentially and retrying.
func (i *instanceManager) collectStats() {
attempt := 0
START:
// Get a device plugin
devicePlugin, err := i.dispense()
if err != nil {
i.logger.Error("dispensing plugin failed", "error", err)
i.cancel()
return
}
// Start stats collection
statsCh, err := devicePlugin.Stats(i.ctx, i.statsInterval)
if err != nil {
i.logger.Error("stats collection failed", "error", err)
return
}
var sresp *device.StatsResponse
var ok bool
for {
select {
case <-i.ctx.Done():
return
case sresp, ok = <-statsCh:
}
if !ok {
i.logger.Trace("exiting since stats gracefully shutdown")
return
}
// Guard against error by the plugin
if sresp == nil {
continue
}
// Handle any errors
if sresp.Error != nil {
if sresp.Error == bstructs.ErrPluginShutdown {
i.logger.Error("plugin exited unexpectedly")
goto START
}
// Retry with an exponential backoff
backoff := (1 << (2 * uint64(attempt))) * statsBackoffBaseline
if backoff > statsBackoffLimit {
backoff = statsBackoffLimit
}
attempt++
i.logger.Error("stats returned an error", "error", err, "retry", backoff)
select {
case <-i.ctx.Done():
return
case <-time.After(backoff):
goto START
}
}
// Reset the attempt since we got statistics
attempt = 0
// Store the new stats
if sresp.Groups != nil {
i.deviceStatsLock.Lock()
i.deviceStats = sresp.Groups
i.deviceStatsLock.Unlock()
}
}
}