drivermanager: attempt to reattach and shutdown driver plugin if blocked by allow/block lists
This commit is contained in:
parent
ce1a5cba0e
commit
a02308ee6a
|
@ -911,7 +911,7 @@ func (c *Client) restoreState() error {
|
|||
arConf := &allocrunner.Config{
|
||||
Alloc: alloc,
|
||||
Logger: c.logger,
|
||||
ClientConfig: c.config,
|
||||
ClientConfig: c.configCopy,
|
||||
StateDB: c.stateDB,
|
||||
StateUpdater: c,
|
||||
DeviceStatsReporter: c,
|
||||
|
@ -2009,7 +2009,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
|||
arConf := &allocrunner.Config{
|
||||
Alloc: alloc,
|
||||
Logger: c.logger,
|
||||
ClientConfig: c.config,
|
||||
ClientConfig: c.configCopy,
|
||||
StateDB: c.stateDB,
|
||||
Consul: c.consulService,
|
||||
Vault: c.vaultClient,
|
||||
|
|
|
@ -1199,7 +1199,8 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
|
|||
"node.mock.testattr1": "val1",
|
||||
},
|
||||
}
|
||||
n := client.updateNodeFromDriver("mock", info)
|
||||
client.updateNodeFromDriver("mock", info)
|
||||
n := client.config.Node
|
||||
|
||||
updatedInfo := *n.Drivers["mock"]
|
||||
// compare without update time
|
||||
|
@ -1220,7 +1221,8 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
|
|||
"node.mock.testattr1": "val2",
|
||||
},
|
||||
}
|
||||
n := client.updateNodeFromDriver("mock", info)
|
||||
client.updateNodeFromDriver("mock", info)
|
||||
n := client.Node()
|
||||
|
||||
updatedInfo := *n.Drivers["mock"]
|
||||
// compare without update time
|
||||
|
@ -1231,7 +1233,8 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
|
|||
assert.Equal(t, "val2", n.Attributes["node.mock.testattr1"])
|
||||
|
||||
// update once more with the same info, updateTime shouldn't change
|
||||
un := client.updateNodeFromDriver("mock", info)
|
||||
client.updateNodeFromDriver("mock", info)
|
||||
un := client.Node()
|
||||
assert.EqualValues(t, n, un)
|
||||
}
|
||||
|
||||
|
@ -1245,7 +1248,8 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
|
|||
"node.mock.testattr1": "",
|
||||
},
|
||||
}
|
||||
n := client.updateNodeFromDriver("mock", info)
|
||||
client.updateNodeFromDriver("mock", info)
|
||||
n := client.Node()
|
||||
|
||||
updatedInfo := *n.Drivers["mock"]
|
||||
// compare without update time
|
||||
|
@ -1256,7 +1260,8 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
|
|||
assert.Equal(t, "", n.Attributes["node.mock.testattr1"])
|
||||
|
||||
// update once more with the same info, updateTime shouldn't change
|
||||
un := client.updateNodeFromDriver("mock", info)
|
||||
client.updateNodeFromDriver("mock", info)
|
||||
un := client.Node()
|
||||
assert.EqualValues(t, n, un)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
|
||||
var (
|
||||
// batchFirstFingerprintsTimeout is the maximum amount of time to wait for
|
||||
// intial fingerprinting to complete before sending a batched Node update
|
||||
// initial fingerprinting to complete before sending a batched Node update
|
||||
batchFirstFingerprintsTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
|
@ -43,7 +43,10 @@ SEND_BATCH:
|
|||
var driverChanged bool
|
||||
c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) {
|
||||
if c.updateNodeFromDriverLocked(driver, info) {
|
||||
c.config.Node.Drivers[driver].UpdateTime = time.Now()
|
||||
c.config.Node.Drivers[driver] = info
|
||||
if c.config.Node.Drivers[driver].UpdateTime.IsZero() {
|
||||
c.config.Node.Drivers[driver].UpdateTime = time.Now()
|
||||
}
|
||||
driverChanged = true
|
||||
}
|
||||
})
|
||||
|
@ -56,7 +59,7 @@ SEND_BATCH:
|
|||
}
|
||||
})
|
||||
|
||||
// only update the node if changes occured
|
||||
// only update the node if changes occurred
|
||||
if driverChanged || devicesChanged {
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
|
@ -69,7 +72,10 @@ func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) {
|
|||
defer c.configLock.Unlock()
|
||||
|
||||
if c.updateNodeFromDriverLocked(name, info) {
|
||||
c.config.Node.Drivers[name].UpdateTime = time.Now()
|
||||
c.config.Node.Drivers[name] = info
|
||||
if c.config.Node.Drivers[name].UpdateTime.IsZero() {
|
||||
c.config.Node.Drivers[name].UpdateTime = time.Now()
|
||||
}
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
}
|
||||
|
@ -84,7 +90,6 @@ func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInf
|
|||
if !hadDriver {
|
||||
// If the driver info has not yet been set, do that here
|
||||
hasChanged = true
|
||||
c.config.Node.Drivers[name] = info
|
||||
for attrName, newVal := range info.Attributes {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
|
@ -93,9 +98,9 @@ func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInf
|
|||
// The driver info has already been set, fix it up
|
||||
if oldVal.Detected != info.Detected {
|
||||
hasChanged = true
|
||||
c.config.Node.Drivers[name].Detected = info.Detected
|
||||
}
|
||||
|
||||
// If health state has change, trigger node event
|
||||
if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription {
|
||||
hasChanged = true
|
||||
if info.HealthDescription != "" {
|
||||
|
@ -107,8 +112,6 @@ func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInf
|
|||
}
|
||||
c.triggerNodeEvent(event)
|
||||
}
|
||||
// Update the node with the latest information
|
||||
c.config.Node.Drivers[name].MergeHealthCheck(info)
|
||||
}
|
||||
|
||||
for attrName, newVal := range info.Attributes {
|
||||
|
@ -225,7 +228,7 @@ func (b *batchNodeUpdates) batchDriverUpdates(f drivermanager.UpdateNodeDriverIn
|
|||
}
|
||||
|
||||
// updateNodeFromDevices implements devicemanager.UpdateNodeDevicesFn and is
|
||||
// used in teh device manager to send device fingerprints to
|
||||
// used in the device manager to send device fingerprints to
|
||||
func (b *batchNodeUpdates) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
|
||||
b.devicesMu.Lock()
|
||||
defer b.devicesMu.Unlock()
|
||||
|
|
|
@ -274,6 +274,9 @@ func (m *manager) loadReattachConfigs() error {
|
|||
if s != nil {
|
||||
for name, c := range s.ReattachConfigs {
|
||||
if m.isDriverBlocked(name) {
|
||||
m.logger.Warn("reattach config for driver plugin found but driver is blocked due to allow/block list, killing plugin",
|
||||
"driver", name)
|
||||
m.shutdownBlockedDriver(name, c)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -288,6 +291,28 @@ func (m *manager) loadReattachConfigs() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// shutdownBlockedDriver is used to forcefully shutdown a running driver plugin
|
||||
// when it has been blocked due to allow/block lists
|
||||
func (m *manager) shutdownBlockedDriver(name string, reattach *shared.ReattachConfig) {
|
||||
c, err := shared.ReattachConfigToGoPlugin(reattach)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to reattach and kill blocked driver plugin",
|
||||
"driver", name, "error", err)
|
||||
return
|
||||
|
||||
}
|
||||
pluginInstance, err := m.loader.Reattach(name, base.PluginTypeDriver, c)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to reattach and kill blocked driver plugin",
|
||||
"driver", name, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !pluginInstance.Exited() {
|
||||
pluginInstance.Kill()
|
||||
}
|
||||
}
|
||||
|
||||
// storePluginReattachConfig is used as a callback to the instance managers and
|
||||
// persists thhe plugin reattach configurations.
|
||||
func (m *manager) storePluginReattachConfig(id loader.PluginID, c *plugin.ReattachConfig) error {
|
||||
|
|
|
@ -185,12 +185,14 @@ func TestMananger_TaskEvents(t *testing.T) {
|
|||
event1 := mockTaskEvent("abc1")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
mgr.eventHandlerFactory = func(string, string) EventHandler {
|
||||
mgr.instancesMu.Lock()
|
||||
mgr.instances["mock"].eventHandlerFactory = func(string, string) EventHandler {
|
||||
return func(ev *drivers.TaskEvent) {
|
||||
defer wg.Done()
|
||||
assert.Exactly(t, event1, ev)
|
||||
}
|
||||
}
|
||||
mgr.instancesMu.Unlock()
|
||||
|
||||
evChan <- event1
|
||||
wg.Wait()
|
||||
|
|
Loading…
Reference in New Issue