Track plan rejection history and automatically mark clients as ineligible (#13421)

Plan rejections occur when the scheduler work and the leader plan
applier disagree on the feasibility of a plan. This may happen for valid
reasons: since Nomad does parallel scheduling, it is expected that
different workers will have a different state when computing placements.

As the final plan reaches the leader plan applier, it may no longer be
valid due to a concurrent scheduling taking up intended resources. In
these situations the plan applier will notify the worker that the plan
was rejected and that they should refresh their state before trying
again.

In some rare and unexpected circumstances it has been observed that
workers will repeatedly submit the same plan, even if they are always
rejected.

While the root cause is still unknown this mitigation has been put in
place. The plan applier will now track the history of plan rejections
per client and include in the plan result a list of node IDs that should
be set as ineligible if the number of rejections in a given time window
crosses a certain threshold. The window size and threshold value can be
adjusted in the server configuration.

To avoid marking several nodes as ineligible at one, the operation is rate
limited to 5 nodes every 30min, with an initial burst of 10 operations.
This commit is contained in:
Luiz Aoqui 2022-07-12 18:40:20 -04:00 committed by GitHub
parent 3e50f72fad
commit b656981cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 763 additions and 12 deletions

7
.changelog/13421.txt Normal file
View File

@ -0,0 +1,7 @@
```release-note:improvement
core: automatically mark clients with recurring plan rejections as ineligible
```
```release-note:improvement
metrics: emit `nomad.nomad.plan.rejection_tracker.node_score` metric for the number of times a node had a plan rejection within the past time window
```

View File

@ -439,6 +439,20 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
return nil, fmt.Errorf("deploy_query_rate_limit must be greater than 0")
}
// Set plan rejection tracker configuration.
if planRejectConf := agentConfig.Server.PlanRejectionTracker; planRejectConf != nil {
if planRejectConf.Enabled != nil {
conf.NodePlanRejectionEnabled = *planRejectConf.Enabled
}
conf.NodePlanRejectionThreshold = planRejectConf.NodeThreshold
if planRejectConf.NodeWindow == 0 {
return nil, fmt.Errorf("plan_rejection_tracker.node_window must be greater than 0")
} else {
conf.NodePlanRejectionWindow = planRejectConf.NodeWindow
}
}
// Add Enterprise license configs
conf.LicenseEnv = agentConfig.Server.LicenseEnv
conf.LicensePath = agentConfig.Server.LicensePath

View File

@ -366,6 +366,82 @@ func TestAgent_ServerConfig_Limits_OK(t *testing.T) {
}
}
func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) {
ci.Parallel(t)
cases := []struct {
name string
trackerConfig *PlanRejectionTracker
expectedConfig *PlanRejectionTracker
expectedErr string
}{
{
name: "default",
trackerConfig: nil,
expectedConfig: &PlanRejectionTracker{
NodeThreshold: 100,
NodeWindow: 5 * time.Minute,
},
expectedErr: "",
},
{
name: "valid config",
trackerConfig: &PlanRejectionTracker{
Enabled: helper.BoolToPtr(true),
NodeThreshold: 123,
NodeWindow: 17 * time.Minute,
},
expectedConfig: &PlanRejectionTracker{
Enabled: helper.BoolToPtr(true),
NodeThreshold: 123,
NodeWindow: 17 * time.Minute,
},
expectedErr: "",
},
{
name: "invalid node window",
trackerConfig: &PlanRejectionTracker{
NodeThreshold: 123,
},
expectedErr: "node_window must be greater than 0",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
config := DevConfig(nil)
require.NoError(t, config.normalizeAddrs())
if tc.trackerConfig != nil {
config.Server.PlanRejectionTracker = tc.trackerConfig
}
serverConfig, err := convertServerConfig(config)
if tc.expectedErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErr)
} else {
require.NoError(t, err)
if tc.expectedConfig.Enabled != nil {
require.Equal(t,
*tc.expectedConfig.Enabled,
serverConfig.NodePlanRejectionEnabled,
)
}
require.Equal(t,
tc.expectedConfig.NodeThreshold,
serverConfig.NodePlanRejectionThreshold,
)
require.Equal(t,
tc.expectedConfig.NodeWindow,
serverConfig.NodePlanRejectionWindow,
)
}
})
}
}
func TestAgent_ServerConfig_RaftMultiplier_Ok(t *testing.T) {
ci.Parallel(t)

View File

@ -516,6 +516,10 @@ type ServerConfig struct {
// This value is ignored.
DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"`
// PlanRejectionTracker configures the node plan rejection tracker that
// detects potentially bad nodes.
PlanRejectionTracker *PlanRejectionTracker `hcl:"plan_rejection_tracker"`
// EnableEventBroker configures whether this server's state store
// will generate events for its event stream.
EnableEventBroker *bool `hcl:"enable_event_broker"`
@ -561,6 +565,53 @@ type RaftBoltConfig struct {
NoFreelistSync bool `hcl:"no_freelist_sync"`
}
// PlanRejectionTracker is used in servers to configure the plan rejection
// tracker.
type PlanRejectionTracker struct {
// Enabled controls if the plan rejection tracker is active or not.
Enabled *bool `hcl:"enabled"`
// NodeThreshold is the number of times a node can have plan rejections
// before it is marked as ineligible.
NodeThreshold int `hcl:"node_threshold"`
// NodeWindow is the time window used to track active plan rejections for
// nodes.
NodeWindow time.Duration
NodeWindowHCL string `hcl:"node_window" json:"-"`
// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTracker {
if p == nil {
return b
}
result := *p
if b == nil {
return &result
}
if b.Enabled != nil {
result.Enabled = b.Enabled
}
if b.NodeThreshold != 0 {
result.NodeThreshold = b.NodeThreshold
}
if b.NodeWindow != 0 {
result.NodeWindow = b.NodeWindow
}
if b.NodeWindowHCL != "" {
result.NodeWindowHCL = b.NodeWindowHCL
}
return &result
}
// Search is used in servers to configure search API options.
type Search struct {
// FuzzyEnabled toggles whether the FuzzySearch API is enabled. If not
@ -998,6 +1049,11 @@ func DefaultConfig() *Config {
EventBufferSize: helper.IntToPtr(100),
RaftProtocol: 3,
StartJoin: []string{},
PlanRejectionTracker: &PlanRejectionTracker{
Enabled: helper.BoolToPtr(false),
NodeThreshold: 100,
NodeWindow: 5 * time.Minute,
},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
RetryInterval: 30 * time.Second,
@ -1608,6 +1664,10 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.EventBufferSize = b.EventBufferSize
}
if b.PlanRejectionTracker != nil {
result.PlanRejectionTracker = result.PlanRejectionTracker.Merge(b.PlanRejectionTracker)
}
if b.DefaultSchedulerConfig != nil {
c := *b.DefaultSchedulerConfig
result.DefaultSchedulerConfig = &c

View File

@ -43,9 +43,12 @@ func ParseConfigFile(path string) (*Config, error) {
VaultRetry: &client.RetryConfig{},
},
},
Server: &ServerConfig{
PlanRejectionTracker: &PlanRejectionTracker{},
ServerJoin: &ServerJoin{},
},
ACL: &ACLConfig{},
Audit: &config.AuditConfig{},
Server: &ServerConfig{ServerJoin: &ServerJoin{}},
Consul: &config.ConsulConfig{},
Autopilot: &config.AutopilotConfig{},
Telemetry: &Telemetry{},
@ -54,7 +57,7 @@ func ParseConfigFile(path string) (*Config, error) {
err = hcl.Decode(c, buf.String())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode HCL file %s: %w", path, err)
}
// convert strings to time.Durations
@ -66,6 +69,7 @@ func ParseConfigFile(path string) (*Config, error) {
{"server.heartbeat_grace", &c.Server.HeartbeatGrace, &c.Server.HeartbeatGraceHCL, nil},
{"server.min_heartbeat_ttl", &c.Server.MinHeartbeatTTL, &c.Server.MinHeartbeatTTLHCL, nil},
{"server.failover_heartbeat_ttl", &c.Server.FailoverHeartbeatTTL, &c.Server.FailoverHeartbeatTTLHCL, nil},
{"server.plan_rejection_tracker.node_window", &c.Server.PlanRejectionTracker.NodeWindow, &c.Server.PlanRejectionTracker.NodeWindowHCL, nil},
{"server.retry_interval", &c.Server.RetryInterval, &c.Server.RetryIntervalHCL, nil},
{"server.server_join.retry_interval", &c.Server.ServerJoin.RetryInterval, &c.Server.ServerJoin.RetryIntervalHCL, nil},
{"consul.timeout", &c.Consul.Timeout, &c.Consul.TimeoutHCL, nil},

View File

@ -126,6 +126,12 @@ var basicConfig = &Config{
EncryptKey: "abc",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(200),
PlanRejectionTracker: &PlanRejectionTracker{
Enabled: helper.BoolToPtr(true),
NodeThreshold: 100,
NodeWindow: 41 * time.Minute,
NodeWindowHCL: "41m",
},
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
@ -543,6 +549,9 @@ func (c *Config) addDefaults() {
if c.Server.ServerJoin == nil {
c.Server.ServerJoin = &ServerJoin{}
}
if c.Server.PlanRejectionTracker == nil {
c.Server.PlanRejectionTracker = &PlanRejectionTracker{}
}
}
// Tests for a panic parsing json with an object of exactly
@ -620,6 +629,11 @@ var sample0 = &Config{
RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"},
EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==",
ServerJoin: &ServerJoin{},
PlanRejectionTracker: &PlanRejectionTracker{
NodeThreshold: 100,
NodeWindow: 31 * time.Minute,
NodeWindowHCL: "31m",
},
},
ACL: &ACLConfig{
Enabled: true,
@ -710,6 +724,11 @@ var sample1 = &Config{
RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"},
EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==",
ServerJoin: &ServerJoin{},
PlanRejectionTracker: &PlanRejectionTracker{
NodeThreshold: 100,
NodeWindow: 31 * time.Minute,
NodeWindowHCL: "31m",
},
},
ACL: &ACLConfig{
Enabled: true,

View File

@ -148,6 +148,11 @@ func TestConfig_Merge(t *testing.T) {
UpgradeVersion: "foo",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
PlanRejectionTracker: &PlanRejectionTracker{
Enabled: helper.BoolToPtr(true),
NodeThreshold: 100,
NodeWindow: 11 * time.Minute,
},
},
ACL: &ACLConfig{
Enabled: true,
@ -343,6 +348,11 @@ func TestConfig_Merge(t *testing.T) {
UpgradeVersion: "bar",
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(100),
PlanRejectionTracker: &PlanRejectionTracker{
Enabled: helper.BoolToPtr(true),
NodeThreshold: 100,
NodeWindow: 11 * time.Minute,
},
},
ACL: &ACLConfig{
Enabled: true,

View File

@ -133,6 +133,12 @@ server {
enable_event_broker = false
event_buffer_size = 200
plan_rejection_tracker {
enabled = true
node_threshold = 100
node_window = "41m"
}
server_join {
retry_join = ["1.1.1.1", "2.2.2.2"]
retry_max = 3

View File

@ -278,6 +278,11 @@
"node_gc_threshold": "12h",
"non_voting_server": true,
"num_schedulers": 2,
"plan_rejection_tracker": {
"enabled": true,
"node_threshold": 100,
"node_window": "41m"
},
"raft_protocol": 3,
"raft_multiplier": 4,
"redundancy_zone": "foo",

View File

@ -55,6 +55,10 @@
"bootstrap_expect": 3,
"enabled": true,
"encrypt": "sHck3WL6cxuhuY7Mso9BHA==",
"plan_rejection_tracker": {
"node_threshold": 100,
"node_window": "31m"
},
"retry_join": [
"10.0.0.101",
"10.0.0.102",

View File

@ -20,6 +20,10 @@
"bootstrap_expect": 3,
"enabled": true,
"encrypt": "sHck3WL6cxuhuY7Mso9BHA==",
"plan_rejection_tracker": {
"node_threshold": 100,
"node_window": "31m"
},
"retry_join": [
"10.0.0.101",
"10.0.0.102",

View File

@ -248,6 +248,17 @@ type Config struct {
// additional delay is selected from this range randomly.
EvalFailedFollowupDelayRange time.Duration
// NodePlanRejectionEnabled controls if node rejection tracker is enabled.
NodePlanRejectionEnabled bool
// NodePlanRejectionThreshold is the number of times a node must have a
// plan rejection before it is set as ineligible.
NodePlanRejectionThreshold int
// NodePlanRejectionWindow is the time window used to track plan
// rejections for nodes.
NodePlanRejectionWindow time.Duration
// MinHeartbeatTTL is the minimum time between heartbeats.
// This is used as a floor to prevent excessive updates.
MinHeartbeatTTL time.Duration
@ -415,6 +426,9 @@ func DefaultConfig() *Config {
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,
FailoverHeartbeatTTL: 300 * time.Second,
NodePlanRejectionEnabled: false,
NodePlanRejectionThreshold: 15,
NodePlanRejectionWindow: 10 * time.Minute,
ConsulConfig: config.DefaultConsulConfig(),
VaultConfig: config.DefaultVaultConfig(),
RPCHoldTimeout: 5 * time.Second,

View File

@ -25,20 +25,45 @@ type planner struct {
// planQueue is used to manage the submitted allocation
// plans that are waiting to be assessed by the leader
planQueue *PlanQueue
// badNodeTracker keeps a score for nodes that have plan rejections.
// Plan rejections are somewhat expected given Nomad's optimistic
// scheduling, but repeated rejections for the same node may indicate an
// undetected issue, so we need to track rejection history.
badNodeTracker BadNodeTracker
}
// newPlanner returns a new planner to be used for managing allocation plans.
func newPlanner(s *Server) (*planner, error) {
log := s.logger.Named("planner")
// Create a plan queue
planQueue, err := NewPlanQueue()
if err != nil {
return nil, err
}
// Create the bad node tracker.
var badNodeTracker BadNodeTracker
if s.config.NodePlanRejectionEnabled {
config := DefaultCachedBadNodeTrackerConfig()
config.Window = s.config.NodePlanRejectionWindow
config.Threshold = s.config.NodePlanRejectionThreshold
badNodeTracker, err = NewCachedBadNodeTracker(log, config)
if err != nil {
return nil, err
}
} else {
badNodeTracker = &NoopBadNodeTracker{}
}
return &planner{
Server: s,
log: s.logger.Named("planner"),
planQueue: planQueue,
Server: s,
log: log,
planQueue: planQueue,
badNodeTracker: badNodeTracker,
}, nil
}
@ -144,6 +169,13 @@ func (p *planner) planApply() {
continue
}
// Check if any of the rejected nodes should be made ineligible.
for _, nodeID := range result.RejectedNodes {
if p.badNodeTracker.Add(nodeID) {
result.IneligibleNodes = append(result.IneligibleNodes, nodeID)
}
}
// Fast-path the response if there is nothing to do
if result.IsNoOp() {
pending.respond(result, nil)
@ -207,6 +239,8 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64
// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
now := time.Now().UTC().UnixNano()
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
@ -214,11 +248,12 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
IneligibleNodes: result.IneligibleNodes,
EvalID: plan.EvalID,
UpdatedAt: now,
}
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
now := time.Now().UTC().UnixNano()
if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
@ -493,6 +528,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
// errors since we are processing in parallel.
var mErr multierror.Error
partialCommit := false
rejectedNodes := make(map[string]struct{}, 0)
// handleResult is used to process the result of evaluateNodePlan
handleResult := func(nodeID string, fit bool, reason string, err error) (cancel bool) {
@ -516,8 +552,11 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
"node_id", nodeID, "reason", reason, "eval_id", plan.EvalID,
"namespace", plan.Job.Namespace)
}
// Set that this is a partial commit
// Set that this is a partial commit and store the node that was
// rejected so the plan applier can detect repeated plan rejections
// for the same node.
partialCommit = true
rejectedNodes[nodeID] = struct{}{}
// If we require all-at-once scheduling, there is no point
// to continue the evaluation, as we've already failed.
@ -622,6 +661,10 @@ OUTER:
// placed but wasn't actually placed
correctDeploymentCanaries(result)
}
for n := range rejectedNodes {
result.RejectedNodes = append(result.RejectedNodes, n)
}
return result, mErr.ErrorOrNil()
}

View File

@ -0,0 +1,212 @@
package nomad
import (
"fmt"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/helper"
"golang.org/x/time/rate"
)
type BadNodeTracker interface {
Add(string) bool
EmitStats(time.Duration, <-chan struct{})
}
// NoopBadNodeTracker is a no-op implementation of bad node tracker that is
// used when tracking is disabled.
type NoopBadNodeTracker struct{}
func (n *NoopBadNodeTracker) EmitStats(time.Duration, <-chan struct{}) {}
func (n *NoopBadNodeTracker) Add(string) bool {
return false
}
// CachedBadNodeTracker keeps a record of nodes marked as bad by the plan
// applier in a LRU cache.
//
// It takes a time window and a threshold value. Plan rejections for a node
// will be registered with its timestamp. If the number of rejections within
// the time window is greater than the threshold the node is reported as bad.
//
// The tracker uses a fixed size cache that evicts old entries based on access
// frequency and recency.
type CachedBadNodeTracker struct {
logger hclog.Logger
cache *lru.TwoQueueCache
limiter *rate.Limiter
window time.Duration
threshold int
}
type CachedBadNodeTrackerConfig struct {
CacheSize int
RateLimit float64
BurstSize int
Window time.Duration
Threshold int
}
func DefaultCachedBadNodeTrackerConfig() CachedBadNodeTrackerConfig {
return CachedBadNodeTrackerConfig{
CacheSize: 50,
// Limit marking 5 nodes per 30min as ineligible with an initial
// burst of 10 nodes.
RateLimit: 5.0 / (30 * 60),
BurstSize: 10,
// Consider a node as bad if it is added more than 100 times in a 5min
// window period.
Window: 5 * time.Minute,
Threshold: 100,
}
}
// NewCachedBadNodeTracker returns a new CachedBadNodeTracker.
func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerConfig) (*CachedBadNodeTracker, error) {
log := logger.Named("bad_node_tracker").
With("window", config.Window).
With("threshold", config.Threshold)
cache, err := lru.New2Q(config.CacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create new bad node tracker: %v", err)
}
return &CachedBadNodeTracker{
logger: log,
cache: cache,
limiter: rate.NewLimiter(rate.Limit(config.RateLimit), config.BurstSize),
window: config.Window,
threshold: config.Threshold,
}, nil
}
// Add records a new rejection for a node and returns true if the number of
// rejections reaches the threshold.
//
// If it's the first time the node is added it will be included in the internal
// cache. If the cache is full the least recently updated or accessed node is
// evicted.
func (c *CachedBadNodeTracker) Add(nodeID string) bool {
value, ok := c.cache.Get(nodeID)
if !ok {
value = newBadNodeStats(nodeID, c.window)
c.cache.Add(nodeID, value)
}
stats := value.(*badNodeStats)
now := time.Now()
stats.record(now)
return c.isBad(now, stats)
}
// EmitStats generates metrics for the bad nodes being currently tracked. Must
// be called in a goroutine.
func (c *CachedBadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()
for {
timer.Reset(period)
select {
case <-timer.C:
c.emitStats()
case <-stopCh:
return
}
}
}
// isBad returns true if the node has more rejections than the threshold within
// the time window.
func (c *CachedBadNodeTracker) isBad(t time.Time, stats *badNodeStats) bool {
score := stats.score(t)
logger := c.logger.With("node_id", stats.id, "score", score)
logger.Trace("checking if node is bad")
if score >= c.threshold {
// Limit the number of nodes we report as bad to avoid mass assigning
// nodes as ineligible, but do it after Get to keep the cache entry
// fresh.
if !c.limiter.Allow() {
logger.Trace("node is bad, but returning false due to rate limiting")
return false
}
return true
}
return false
}
func (c *CachedBadNodeTracker) emitStats() {
now := time.Now()
for _, k := range c.cache.Keys() {
value, _ := c.cache.Get(k)
stats := value.(*badNodeStats)
score := stats.score(now)
labels := []metrics.Label{
{Name: "node_id", Value: k.(string)},
}
metrics.SetGaugeWithLabels([]string{"nomad", "plan", "rejection_tracker", "node_score"}, float32(score), labels)
}
}
// badNodeStats represents a node being tracked by a BadNodeTracker.
type badNodeStats struct {
id string
history []time.Time
window time.Duration
}
// newBadNodeStats returns an empty badNodeStats.
func newBadNodeStats(id string, window time.Duration) *badNodeStats {
return &badNodeStats{
id: id,
window: window,
}
}
// score returns the number of rejections within the past time window.
func (s *badNodeStats) score(t time.Time) int {
active, expired := s.countActive(t)
// Remove expired records.
if expired > 0 {
s.history = s.history[expired:]
}
return active
}
// record adds a new entry to the stats history and returns the new score.
func (s *badNodeStats) record(t time.Time) {
s.history = append(s.history, t)
}
// countActive returns the number of records that happened after the time
// window started (active) and before (expired).
func (s *badNodeStats) countActive(t time.Time) (int, int) {
windowStart := t.Add(-s.window)
// Assume all values are expired and move back from history until we find
// a record that actually happened before the window started.
expired := len(s.history)
for ; expired > 0; expired-- {
i := expired - 1
ts := s.history[i]
if ts.Before(windowStart) {
break
}
}
active := len(s.history) - expired
return active, expired
}

View File

@ -0,0 +1,164 @@
package nomad
import (
"fmt"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/stretchr/testify/require"
)
func TesCachedtBadNodeTracker(t *testing.T) {
ci.Parallel(t)
config := DefaultCachedBadNodeTrackerConfig()
config.CacheSize = 3
tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config)
require.NoError(t, err)
for i := 0; i < 10; i++ {
tracker.Add(fmt.Sprintf("node-%d", i+1))
}
require.Equal(t, config.CacheSize, tracker.cache.Len())
// Only track the most recent values.
expected := []string{"node-8", "node-9", "node-10"}
require.ElementsMatch(t, expected, tracker.cache.Keys())
}
func TestCachedBadNodeTracker_isBad(t *testing.T) {
ci.Parallel(t)
config := DefaultCachedBadNodeTrackerConfig()
config.CacheSize = 3
config.Threshold = 4
tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config)
require.NoError(t, err)
// Populate cache.
tracker.Add("node-1")
tracker.Add("node-2")
tracker.Add("node-2")
tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")
testCases := []struct {
name string
nodeID string
bad bool
}{
{
name: "node-1 is not bad",
nodeID: "node-1",
bad: false,
},
{
name: "node-3 is bad",
nodeID: "node-3",
bad: true,
},
}
now := time.Now()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Read value from cached.
v, ok := tracker.cache.Get(tc.nodeID)
require.True(t, ok)
// Check if it's bad.
stats := v.(*badNodeStats)
got := tracker.isBad(now, stats)
require.Equal(t, tc.bad, got)
})
}
future := time.Now().Add(2 * config.Window)
nodes := []string{"node-1", "node-2", "node-3"}
for _, n := range nodes {
t.Run(fmt.Sprintf("%s cache expires", n), func(t *testing.T) {
v, ok := tracker.cache.Get(n)
require.True(t, ok)
stats := v.(*badNodeStats)
bad := tracker.isBad(future, stats)
require.False(t, bad)
})
}
}
func TesCachedtBadNodeTracker_rateLimit(t *testing.T) {
ci.Parallel(t)
config := DefaultCachedBadNodeTrackerConfig()
config.Threshold = 3
config.RateLimit = float64(1) // Get a new token every second.
config.BurstSize = 3
tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config)
require.NoError(t, err)
tracker.Add("node-1")
tracker.Add("node-1")
tracker.Add("node-1")
tracker.Add("node-1")
tracker.Add("node-1")
v, ok := tracker.cache.Get("node-1")
require.True(t, ok)
stats := v.(*badNodeStats)
// Burst allows for max 3 operations.
now := time.Now()
require.True(t, tracker.isBad(now, stats))
require.True(t, tracker.isBad(now, stats))
require.True(t, tracker.isBad(now, stats))
require.False(t, tracker.isBad(now, stats))
// Wait for a new token.
time.Sleep(time.Second)
now = time.Now()
require.True(t, tracker.isBad(now, stats))
require.False(t, tracker.isBad(now, stats))
}
func TestBadNodeStats_score(t *testing.T) {
ci.Parallel(t)
window := time.Minute
stats := newBadNodeStats("node-1", window)
now := time.Now()
require.Equal(t, 0, stats.score(now))
stats.record(now)
stats.record(now)
stats.record(now)
require.Equal(t, 3, stats.score(now))
require.Len(t, stats.history, 3)
halfWindow := now.Add(window / 2)
stats.record(halfWindow)
require.Equal(t, 4, stats.score(halfWindow))
require.Len(t, stats.history, 4)
fullWindow := now.Add(window).Add(time.Second)
require.Equal(t, 1, stats.score(fullWindow))
require.Len(t, stats.history, 1)
afterWindow := now.Add(2 * window)
require.Equal(t, 0, stats.score(afterWindow))
require.Len(t, stats.history, 0)
}

View File

@ -481,6 +481,9 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
// Emit metrics for the plan queue
go s.planQueue.EmitStats(time.Second, s.shutdownCh)
// Emit metrics for the planner's bad node tracker.
go s.planner.badNodeTracker.EmitStats(time.Second, s.shutdownCh)
// Emit metrics for the blocked eval tracker.
go s.blockedEvals.EmitStats(time.Second, s.shutdownCh)

View File

@ -35,6 +35,17 @@ const (
)
const (
// NodeEligibilityEventPlanRejectThreshold is the message used when the node
// is set to ineligible due to multiple plan failures.
// This is a preventive measure to signal scheduler workers to not consider
// the node for future placements.
// Plan rejections for a node are expected due to the optimistic and
// concurrent nature of the scheduling process, but repeated failures for
// the same node may indicate an underlying issue not detected by Nomad.
// The plan applier keeps track of plan rejection history and will mark
// nodes as ineligible if they cross a given threshold.
NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information"
// NodeRegisterEventRegistered is the message used when the node becomes
// registered.
NodeRegisterEventRegistered = "Node registered"
@ -360,6 +371,21 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Mark nodes as ineligible.
for _, nodeID := range results.IneligibleNodes {
s.logger.Warn("marking node as ineligible due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information", "node_id", nodeID)
nodeEvent := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemScheduler).
SetMessage(NodeEligibilityEventPlanRejectThreshold)
err := s.updateNodeEligibilityImpl(index, nodeID,
structs.NodeSchedulingIneligible, results.UpdatedAt, nodeEvent, txn)
if err != nil {
return err
}
}
// Upsert the newly created or updated deployment
if results.Deployment != nil {
if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil {
@ -1137,10 +1163,15 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string,
// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
if err := s.updateNodeEligibilityImpl(index, nodeID, eligibility, updatedAt, event, txn); err != nil {
return err
}
return txn.Commit()
}
func (s *StateStore) updateNodeEligibilityImpl(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent, txn *txn) error {
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@ -1177,7 +1208,7 @@ func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index ui
return fmt.Errorf("index update failed: %v", err)
}
return txn.Commit()
return nil
}
// UpsertNodeEvents adds the node events to the nodes, rotating events as

View File

@ -931,6 +931,14 @@ type ApplyPlanResultsRequest struct {
// PreemptionEvals is a slice of follow up evals for jobs whose allocations
// have been preempted to place allocs in this plan
PreemptionEvals []*Evaluation
// IneligibleNodes are nodes the plan applier has repeatedly rejected
// placements for and should therefore be considered ineligible by workers
// to avoid retrying them repeatedly.
IneligibleNodes []string
// UpdatedAt represents server time of receiving request.
UpdatedAt int64
}
// AllocUpdateRequest is used to submit changes to allocations, either
@ -1650,6 +1658,7 @@ const (
NodeEventSubsystemDriver = "Driver"
NodeEventSubsystemHeartbeat = "Heartbeat"
NodeEventSubsystemCluster = "Cluster"
NodeEventSubsystemScheduler = "Scheduler"
NodeEventSubsystemStorage = "Storage"
)
@ -11477,6 +11486,16 @@ type PlanResult struct {
// as stopped.
NodePreemptions map[string][]*Allocation
// RejectedNodes are nodes the scheduler worker has rejected placements for
// and should be considered for ineligibility by the plan applier to avoid
// retrying them repeatedly.
RejectedNodes []string
// IneligibleNodes are nodes the plan applier has repeatedly rejected
// placements for and should therefore be considered ineligible by workers
// to avoid retrying them repeatedly.
IneligibleNodes []string
// RefreshIndex is the index the worker should refresh state up to.
// This allows all evictions and allocations to be materialized.
// If any allocations were rejected due to stale data (node state,
@ -11490,8 +11509,9 @@ type PlanResult struct {
// IsNoOp checks if this plan result would do nothing
func (p *PlanResult) IsNoOp() bool {
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 &&
len(p.DeploymentUpdates) == 0 && p.Deployment == nil
return len(p.IneligibleNodes) == 0 && len(p.NodeUpdate) == 0 &&
len(p.NodeAllocation) == 0 && len(p.DeploymentUpdates) == 0 &&
p.Deployment == nil
}
// FullCommit is used to check if all the allocations in a plan

View File

@ -156,6 +156,10 @@ server {
disallow this server from making any scheduling decisions. This defaults to
the number of CPU cores.
- `plan_rejection_tracker` <code>([PlanRejectionTracker](#plan_rejection_tracker-parameters))</code> -
Configuration for the plan rejection tracker that the Nomad leader uses to
track the history of plan rejections.
- `raft_boltdb` - This is a nested object that allows configuring options for
Raft's BoltDB based log store.
- `no_freelist_sync` - Setting this to `true` will disable syncing the BoltDB
@ -239,6 +243,30 @@ server {
section for more information on the format of the string. This field is
deprecated in favor of the [server_join stanza][server-join].
### `plan_rejection_tracker` Parameters
The leader plan rejection tracker can be adjusted to prevent evaluations from
getting stuck due to always being scheduled to a client that may have an
unexpected issue. Refer to [Monitoring Nomad][monitoring_nomad_progress] for
more details.
- `enabled` `(bool: false)` - Specifies if plan rejections should be tracked.
- `node_threshold` `(int: 100)` - The number of plan rejections for a node
within the `node_window` to trigger a client to be set as ineligible.
- `node_window` `(string: "5m")` - The time window for when plan rejections for
a node should be considered.
If you observe too many false positives (clients being marked as ineligible
even if they don't present any problem) you may want to increase
`node_threshold`.
Or if you are noticing jobs not being scheduled due to plan rejections for the
same `node_id` and the client is not being set as ineligible you can try
increasing the `node_window` so more historical rejections are taken into
account.
## `server` Examples
### Common Setup
@ -331,5 +359,6 @@ server {
[update-scheduler-config]: /api-docs/operator/scheduler#update-scheduler-configuration 'Scheduler Config'
[bootstrapping a cluster]: /docs/faq#bootstrapping
[rfc4648]: https://tools.ietf.org/html/rfc4648#section-5
[monitoring_nomad_progress]: /docs/operations/monitoring-nomad#progress
[`nomad operator keygen`]: /docs/commands/operator/keygen
[search]: /docs/configuration/search

View File

@ -394,6 +394,7 @@ those listed in [Key Metrics](#key-metrics) above.
| `nomad.nomad.plan.apply` | Time elapsed to apply a plan | Nanoseconds | Summary | host |
| `nomad.nomad.plan.evaluate` | Time elapsed to evaluate a plan | Nanoseconds | Summary | host |
| `nomad.nomad.plan.node_rejected` | Number of times a node has had a plan rejected | Integer | Counter | host, node_id |
| `nomad.nomad.plan.rejection_tracker.node_score` | Number of times a node has had a plan rejected within the tracker window | Integer | Gauge | host, node_id |
| `nomad.nomad.plan.queue_depth` | Count of evals in the plan queue | Integer | Gauge | host |
| `nomad.nomad.plan.submit` | Time elapsed for `Plan.Submit` RPC call | Nanoseconds | Summary | host |
| `nomad.nomad.plan.wait_for_index` | Time elapsed that planner waits for the raft index of the plan to be processed | Nanoseconds | Summary | host |

View File

@ -149,10 +149,34 @@ While it is possible for these log lines to occur infrequently due to normal
cluster conditions, they should not appear repeatedly and prevent the job from
eventually running (look up the evaluation ID logged to find the job).
If this log *does* appear repeatedly with the same `node_id` referenced, try
#### Plan rejection tracker
Nomad provides a mechanism to track the history of plan rejections per client
and mark them as ineligible if the number goes above a given threshold within a
time window. This functionality can be enabled using the
[`plan_rejection_tracker`] server configuration.
When a node is marked as ineligible due to excessive plan rejections, the
following node event is registered:
```
Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information
```
Along with the log line:
```
[WARN] nomad.state_store: marking node as ineligible due to multiple plan rejections: node_id=67af2541-5e96-6f54-9095-11089d627626
```
If a client is marked as ineligible due to repeated plan rejections, try
[draining] the node and shutting it down. Misconfigurations not caught by
validation can cause nodes to enter this state: [#11830][gh-11830].
If the `plan for node rejected` log *does* appear repeatedly with the same
`node_id` referenced but the client is not being set as ineligible you can try
adjusting the [`plan_rejection_tracker`] configuration of servers.
### Performance
The following metrics allow observing changes in throughput at the various
@ -278,6 +302,7 @@ latency and packet loss for the [Serf] address.
[metric-types]: /docs/telemetry/metrics#metric-types
[metrics-api-endpoint]: /api-docs/metrics
[prometheus-telem]: /docs/configuration/telemetry#prometheus
[`plan_rejection_tracker`]: /docs/configuration/server#plan_rejection_tracker
[serf]: /docs/configuration#serf-1
[statsd-exporter]: https://github.com/prometheus/statsd_exporter
[statsd-telem]: /docs/configuration/telemetry#statsd