259 lines
7.5 KiB
Go
259 lines
7.5 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/client/devicemanager"
|
|
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
var (
|
|
// batchFirstFingerprintsTimeout is the maximum amount of time to wait for
|
|
// initial fingerprinting to complete before sending a batched Node update
|
|
batchFirstFingerprintsTimeout = 50 * time.Second
|
|
)
|
|
|
|
// batchFirstFingerprints waits for the first fingerprint response from all
|
|
// plugin managers and sends a single Node update for all fingerprints. It
|
|
// should only ever be called once
|
|
func (c *Client) batchFirstFingerprints() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), batchFirstFingerprintsTimeout)
|
|
defer cancel()
|
|
|
|
ch, err := c.pluginManagers.WaitForFirstFingerprint(ctx)
|
|
if err != nil {
|
|
c.logger.Warn("failed to batch initial fingerprint updates, switching to incemental updates")
|
|
goto SEND_BATCH
|
|
}
|
|
|
|
// Wait for fingerprinting to complete or timeout before processing batches
|
|
select {
|
|
case <-ch:
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
SEND_BATCH:
|
|
c.configLock.Lock()
|
|
defer c.configLock.Unlock()
|
|
|
|
// driver node updates
|
|
var driverChanged bool
|
|
c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) {
|
|
if c.updateNodeFromDriverLocked(driver, info) {
|
|
c.config.Node.Drivers[driver] = info
|
|
if c.config.Node.Drivers[driver].UpdateTime.IsZero() {
|
|
c.config.Node.Drivers[driver].UpdateTime = time.Now()
|
|
}
|
|
driverChanged = true
|
|
}
|
|
})
|
|
|
|
// device node updates
|
|
var devicesChanged bool
|
|
c.batchNodeUpdates.batchDevicesUpdates(func(devices []*structs.NodeDeviceResource) {
|
|
if c.updateNodeFromDevicesLocked(devices) {
|
|
devicesChanged = true
|
|
}
|
|
})
|
|
|
|
// only update the node if changes occurred
|
|
if driverChanged || devicesChanged {
|
|
c.updateNodeLocked()
|
|
}
|
|
|
|
close(c.fpInitialized)
|
|
}
|
|
|
|
// updateNodeFromDriver receives a DriverInfo struct for the driver and updates
|
|
// the node accordingly
|
|
func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) {
|
|
c.configLock.Lock()
|
|
defer c.configLock.Unlock()
|
|
|
|
if c.updateNodeFromDriverLocked(name, info) {
|
|
c.config.Node.Drivers[name] = info
|
|
if c.config.Node.Drivers[name].UpdateTime.IsZero() {
|
|
c.config.Node.Drivers[name].UpdateTime = time.Now()
|
|
}
|
|
c.updateNodeLocked()
|
|
}
|
|
}
|
|
|
|
// updateNodeFromDriverLocked makes the changes to the node from a driver update
|
|
// but does not send the update to the server. c.configLock must be held before
|
|
// calling this func
|
|
func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInfo) bool {
|
|
var hasChanged bool
|
|
|
|
hadDriver := c.config.Node.Drivers[name] != nil
|
|
if !hadDriver {
|
|
// If the driver info has not yet been set, do that here
|
|
hasChanged = true
|
|
for attrName, newVal := range info.Attributes {
|
|
c.config.Node.Attributes[attrName] = newVal
|
|
}
|
|
} else {
|
|
oldVal := c.config.Node.Drivers[name]
|
|
// The driver info has already been set, fix it up
|
|
if oldVal.Detected != info.Detected {
|
|
hasChanged = true
|
|
}
|
|
|
|
// If health state has change, trigger node event
|
|
if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription {
|
|
hasChanged = true
|
|
if info.HealthDescription != "" {
|
|
event := &structs.NodeEvent{
|
|
Subsystem: "Driver",
|
|
Message: info.HealthDescription,
|
|
Timestamp: time.Now(),
|
|
Details: map[string]string{"driver": name},
|
|
}
|
|
c.triggerNodeEvent(event)
|
|
}
|
|
}
|
|
|
|
for attrName, newVal := range info.Attributes {
|
|
oldVal := c.config.Node.Drivers[name].Attributes[attrName]
|
|
if oldVal == newVal {
|
|
continue
|
|
}
|
|
|
|
hasChanged = true
|
|
if newVal == "" {
|
|
delete(c.config.Node.Attributes, attrName)
|
|
} else {
|
|
c.config.Node.Attributes[attrName] = newVal
|
|
}
|
|
}
|
|
}
|
|
|
|
// COMPAT Remove in Nomad 0.10
|
|
// We maintain the driver enabled attribute until all drivers expose
|
|
// their attributes as DriverInfo
|
|
driverName := fmt.Sprintf("driver.%s", name)
|
|
if info.Detected {
|
|
c.config.Node.Attributes[driverName] = "1"
|
|
} else {
|
|
delete(c.config.Node.Attributes, driverName)
|
|
}
|
|
|
|
return hasChanged
|
|
}
|
|
|
|
// updateNodeFromFingerprint updates the node with the result of
|
|
// fingerprinting the node from the diff that was created
|
|
func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
|
|
c.configLock.Lock()
|
|
defer c.configLock.Unlock()
|
|
|
|
// Not updating node.Resources: the field is deprecated and includes
|
|
// dispatched task resources and not appropriate for expressing
|
|
// node available device resources
|
|
if c.updateNodeFromDevicesLocked(devices) {
|
|
c.updateNodeLocked()
|
|
}
|
|
}
|
|
|
|
// updateNodeFromDevicesLocked updates the node with the results of devices,
|
|
// but does send the update to the server. c.configLock must be held before
|
|
// calling this func
|
|
func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResource) bool {
|
|
if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) {
|
|
c.logger.Debug("new devices detected", "devices", len(devices))
|
|
c.config.Node.NodeResources.Devices = devices
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// batchNodeUpdates allows for batching multiple Node updates from fingerprinting.
|
|
// Once ready, the batches can be flushed and toggled to stop batching and forward
|
|
// all updates to a configured callback to be performed incrementally
|
|
type batchNodeUpdates struct {
|
|
// access to driver fields must hold driversMu lock
|
|
drivers map[string]*structs.DriverInfo
|
|
driversBatched bool
|
|
driverCB drivermanager.UpdateNodeDriverInfoFn
|
|
driversMu sync.Mutex
|
|
|
|
// access to devices fields must hold devicesMu lock
|
|
devices []*structs.NodeDeviceResource
|
|
devicesBatched bool
|
|
devicesCB devicemanager.UpdateNodeDevicesFn
|
|
devicesMu sync.Mutex
|
|
}
|
|
|
|
func newBatchNodeUpdates(
|
|
driverCB drivermanager.UpdateNodeDriverInfoFn,
|
|
devicesCB devicemanager.UpdateNodeDevicesFn) *batchNodeUpdates {
|
|
|
|
return &batchNodeUpdates{
|
|
drivers: make(map[string]*structs.DriverInfo),
|
|
driverCB: driverCB,
|
|
devices: []*structs.NodeDeviceResource{},
|
|
devicesCB: devicesCB,
|
|
}
|
|
}
|
|
|
|
// updateNodeFromDriver implements drivermanager.UpdateNodeDriverInfoFn and is
|
|
// used in the driver manager to send driver fingerprints to
|
|
func (b *batchNodeUpdates) updateNodeFromDriver(driver string, info *structs.DriverInfo) {
|
|
b.driversMu.Lock()
|
|
defer b.driversMu.Unlock()
|
|
if b.driversBatched {
|
|
b.driverCB(driver, info)
|
|
return
|
|
}
|
|
|
|
b.drivers[driver] = info
|
|
}
|
|
|
|
// batchDriverUpdates sends all of the batched driver node updates by calling f
|
|
// for each driver batched
|
|
func (b *batchNodeUpdates) batchDriverUpdates(f drivermanager.UpdateNodeDriverInfoFn) error {
|
|
b.driversMu.Lock()
|
|
defer b.driversMu.Unlock()
|
|
if b.driversBatched {
|
|
return fmt.Errorf("driver updates already batched")
|
|
}
|
|
|
|
b.driversBatched = true
|
|
for driver, info := range b.drivers {
|
|
f(driver, info)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateNodeFromDevices implements devicemanager.UpdateNodeDevicesFn and is
|
|
// used in the device manager to send device fingerprints to
|
|
func (b *batchNodeUpdates) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
|
|
b.devicesMu.Lock()
|
|
defer b.devicesMu.Unlock()
|
|
if b.devicesBatched {
|
|
b.devicesCB(devices)
|
|
return
|
|
}
|
|
|
|
b.devices = devices
|
|
}
|
|
|
|
// batchDevicesUpdates sends the batched device node updates by calling f with
|
|
// the devices
|
|
func (b *batchNodeUpdates) batchDevicesUpdates(f devicemanager.UpdateNodeDevicesFn) error {
|
|
b.devicesMu.Lock()
|
|
defer b.devicesMu.Unlock()
|
|
if b.devicesBatched {
|
|
return fmt.Errorf("devices updates already batched")
|
|
}
|
|
|
|
b.devicesBatched = true
|
|
f(b.devices)
|
|
return nil
|
|
}
|