From b656981cf037884afc54f74816bbf8c5ad1b2f3f Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 12 Jul 2022 18:40:20 -0400 Subject: [PATCH] Track plan rejection history and automatically mark clients as ineligible (#13421) Plan rejections occur when the scheduler work and the leader plan applier disagree on the feasibility of a plan. This may happen for valid reasons: since Nomad does parallel scheduling, it is expected that different workers will have a different state when computing placements. As the final plan reaches the leader plan applier, it may no longer be valid due to a concurrent scheduling taking up intended resources. In these situations the plan applier will notify the worker that the plan was rejected and that they should refresh their state before trying again. In some rare and unexpected circumstances it has been observed that workers will repeatedly submit the same plan, even if they are always rejected. While the root cause is still unknown this mitigation has been put in place. The plan applier will now track the history of plan rejections per client and include in the plan result a list of node IDs that should be set as ineligible if the number of rejections in a given time window crosses a certain threshold. The window size and threshold value can be adjusted in the server configuration. To avoid marking several nodes as ineligible at one, the operation is rate limited to 5 nodes every 30min, with an initial burst of 10 operations. --- .changelog/13421.txt | 7 + command/agent/agent.go | 14 ++ command/agent/agent_test.go | 76 +++++++ command/agent/config.go | 60 +++++ command/agent/config_parse.go | 8 +- command/agent/config_parse_test.go | 19 ++ command/agent/config_test.go | 10 + command/agent/testdata/basic.hcl | 6 + command/agent/testdata/basic.json | 5 + command/agent/testdata/sample0.json | 4 + command/agent/testdata/sample1/sample0.json | 4 + nomad/config.go | 14 ++ nomad/plan_apply.go | 53 ++++- nomad/plan_apply_node_tracker.go | 212 ++++++++++++++++++ nomad/plan_apply_node_tracker_test.go | 164 ++++++++++++++ nomad/server.go | 3 + nomad/state/state_store.go | 35 ++- nomad/structs/structs.go | 24 +- website/content/docs/configuration/server.mdx | 29 +++ .../docs/operations/metrics-reference.mdx | 1 + .../docs/operations/monitoring-nomad.mdx | 27 ++- 21 files changed, 763 insertions(+), 12 deletions(-) create mode 100644 .changelog/13421.txt create mode 100644 nomad/plan_apply_node_tracker.go create mode 100644 nomad/plan_apply_node_tracker_test.go diff --git a/.changelog/13421.txt b/.changelog/13421.txt new file mode 100644 index 000000000..d37bb2d58 --- /dev/null +++ b/.changelog/13421.txt @@ -0,0 +1,7 @@ +```release-note:improvement +core: automatically mark clients with recurring plan rejections as ineligible +``` + +```release-note:improvement +metrics: emit `nomad.nomad.plan.rejection_tracker.node_score` metric for the number of times a node had a plan rejection within the past time window +``` diff --git a/command/agent/agent.go b/command/agent/agent.go index 0c8217d35..8bbaad8c5 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -439,6 +439,20 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { return nil, fmt.Errorf("deploy_query_rate_limit must be greater than 0") } + // Set plan rejection tracker configuration. + if planRejectConf := agentConfig.Server.PlanRejectionTracker; planRejectConf != nil { + if planRejectConf.Enabled != nil { + conf.NodePlanRejectionEnabled = *planRejectConf.Enabled + } + conf.NodePlanRejectionThreshold = planRejectConf.NodeThreshold + + if planRejectConf.NodeWindow == 0 { + return nil, fmt.Errorf("plan_rejection_tracker.node_window must be greater than 0") + } else { + conf.NodePlanRejectionWindow = planRejectConf.NodeWindow + } + } + // Add Enterprise license configs conf.LicenseEnv = agentConfig.Server.LicenseEnv conf.LicensePath = agentConfig.Server.LicensePath diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index a4845f85f..f0833a731 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -366,6 +366,82 @@ func TestAgent_ServerConfig_Limits_OK(t *testing.T) { } } +func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { + ci.Parallel(t) + + cases := []struct { + name string + trackerConfig *PlanRejectionTracker + expectedConfig *PlanRejectionTracker + expectedErr string + }{ + { + name: "default", + trackerConfig: nil, + expectedConfig: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 5 * time.Minute, + }, + expectedErr: "", + }, + { + name: "valid config", + trackerConfig: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 123, + NodeWindow: 17 * time.Minute, + }, + expectedConfig: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 123, + NodeWindow: 17 * time.Minute, + }, + expectedErr: "", + }, + { + name: "invalid node window", + trackerConfig: &PlanRejectionTracker{ + NodeThreshold: 123, + }, + expectedErr: "node_window must be greater than 0", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + config := DevConfig(nil) + require.NoError(t, config.normalizeAddrs()) + + if tc.trackerConfig != nil { + config.Server.PlanRejectionTracker = tc.trackerConfig + } + + serverConfig, err := convertServerConfig(config) + + if tc.expectedErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErr) + } else { + require.NoError(t, err) + if tc.expectedConfig.Enabled != nil { + require.Equal(t, + *tc.expectedConfig.Enabled, + serverConfig.NodePlanRejectionEnabled, + ) + } + require.Equal(t, + tc.expectedConfig.NodeThreshold, + serverConfig.NodePlanRejectionThreshold, + ) + require.Equal(t, + tc.expectedConfig.NodeWindow, + serverConfig.NodePlanRejectionWindow, + ) + } + }) + } +} + func TestAgent_ServerConfig_RaftMultiplier_Ok(t *testing.T) { ci.Parallel(t) diff --git a/command/agent/config.go b/command/agent/config.go index d8487eb44..98b8b0648 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -516,6 +516,10 @@ type ServerConfig struct { // This value is ignored. DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"` + // PlanRejectionTracker configures the node plan rejection tracker that + // detects potentially bad nodes. + PlanRejectionTracker *PlanRejectionTracker `hcl:"plan_rejection_tracker"` + // EnableEventBroker configures whether this server's state store // will generate events for its event stream. EnableEventBroker *bool `hcl:"enable_event_broker"` @@ -561,6 +565,53 @@ type RaftBoltConfig struct { NoFreelistSync bool `hcl:"no_freelist_sync"` } +// PlanRejectionTracker is used in servers to configure the plan rejection +// tracker. +type PlanRejectionTracker struct { + // Enabled controls if the plan rejection tracker is active or not. + Enabled *bool `hcl:"enabled"` + + // NodeThreshold is the number of times a node can have plan rejections + // before it is marked as ineligible. + NodeThreshold int `hcl:"node_threshold"` + + // NodeWindow is the time window used to track active plan rejections for + // nodes. + NodeWindow time.Duration + NodeWindowHCL string `hcl:"node_window" json:"-"` + + // ExtraKeysHCL is used by hcl to surface unexpected keys + ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` +} + +func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTracker { + if p == nil { + return b + } + + result := *p + + if b == nil { + return &result + } + + if b.Enabled != nil { + result.Enabled = b.Enabled + } + + if b.NodeThreshold != 0 { + result.NodeThreshold = b.NodeThreshold + } + + if b.NodeWindow != 0 { + result.NodeWindow = b.NodeWindow + } + if b.NodeWindowHCL != "" { + result.NodeWindowHCL = b.NodeWindowHCL + } + return &result +} + // Search is used in servers to configure search API options. type Search struct { // FuzzyEnabled toggles whether the FuzzySearch API is enabled. If not @@ -998,6 +1049,11 @@ func DefaultConfig() *Config { EventBufferSize: helper.IntToPtr(100), RaftProtocol: 3, StartJoin: []string{}, + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(false), + NodeThreshold: 100, + NodeWindow: 5 * time.Minute, + }, ServerJoin: &ServerJoin{ RetryJoin: []string{}, RetryInterval: 30 * time.Second, @@ -1608,6 +1664,10 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.EventBufferSize = b.EventBufferSize } + if b.PlanRejectionTracker != nil { + result.PlanRejectionTracker = result.PlanRejectionTracker.Merge(b.PlanRejectionTracker) + } + if b.DefaultSchedulerConfig != nil { c := *b.DefaultSchedulerConfig result.DefaultSchedulerConfig = &c diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 363db9e1e..04135f783 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -43,9 +43,12 @@ func ParseConfigFile(path string) (*Config, error) { VaultRetry: &client.RetryConfig{}, }, }, + Server: &ServerConfig{ + PlanRejectionTracker: &PlanRejectionTracker{}, + ServerJoin: &ServerJoin{}, + }, ACL: &ACLConfig{}, Audit: &config.AuditConfig{}, - Server: &ServerConfig{ServerJoin: &ServerJoin{}}, Consul: &config.ConsulConfig{}, Autopilot: &config.AutopilotConfig{}, Telemetry: &Telemetry{}, @@ -54,7 +57,7 @@ func ParseConfigFile(path string) (*Config, error) { err = hcl.Decode(c, buf.String()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode HCL file %s: %w", path, err) } // convert strings to time.Durations @@ -66,6 +69,7 @@ func ParseConfigFile(path string) (*Config, error) { {"server.heartbeat_grace", &c.Server.HeartbeatGrace, &c.Server.HeartbeatGraceHCL, nil}, {"server.min_heartbeat_ttl", &c.Server.MinHeartbeatTTL, &c.Server.MinHeartbeatTTLHCL, nil}, {"server.failover_heartbeat_ttl", &c.Server.FailoverHeartbeatTTL, &c.Server.FailoverHeartbeatTTLHCL, nil}, + {"server.plan_rejection_tracker.node_window", &c.Server.PlanRejectionTracker.NodeWindow, &c.Server.PlanRejectionTracker.NodeWindowHCL, nil}, {"server.retry_interval", &c.Server.RetryInterval, &c.Server.RetryIntervalHCL, nil}, {"server.server_join.retry_interval", &c.Server.ServerJoin.RetryInterval, &c.Server.ServerJoin.RetryIntervalHCL, nil}, {"consul.timeout", &c.Consul.Timeout, &c.Consul.TimeoutHCL, nil}, diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 35edc6dbb..5e5a1399c 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -126,6 +126,12 @@ var basicConfig = &Config{ EncryptKey: "abc", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(200), + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 100, + NodeWindow: 41 * time.Minute, + NodeWindowHCL: "41m", + }, ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, @@ -543,6 +549,9 @@ func (c *Config) addDefaults() { if c.Server.ServerJoin == nil { c.Server.ServerJoin = &ServerJoin{} } + if c.Server.PlanRejectionTracker == nil { + c.Server.PlanRejectionTracker = &PlanRejectionTracker{} + } } // Tests for a panic parsing json with an object of exactly @@ -620,6 +629,11 @@ var sample0 = &Config{ RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"}, EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==", ServerJoin: &ServerJoin{}, + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 31 * time.Minute, + NodeWindowHCL: "31m", + }, }, ACL: &ACLConfig{ Enabled: true, @@ -710,6 +724,11 @@ var sample1 = &Config{ RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"}, EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==", ServerJoin: &ServerJoin{}, + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 31 * time.Minute, + NodeWindowHCL: "31m", + }, }, ACL: &ACLConfig{ Enabled: true, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 36e263a31..2e98438f3 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -148,6 +148,11 @@ func TestConfig_Merge(t *testing.T) { UpgradeVersion: "foo", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 100, + NodeWindow: 11 * time.Minute, + }, }, ACL: &ACLConfig{ Enabled: true, @@ -343,6 +348,11 @@ func TestConfig_Merge(t *testing.T) { UpgradeVersion: "bar", EnableEventBroker: helper.BoolToPtr(true), EventBufferSize: helper.IntToPtr(100), + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 100, + NodeWindow: 11 * time.Minute, + }, }, ACL: &ACLConfig{ Enabled: true, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 1f44fcf00..b9740de99 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -133,6 +133,12 @@ server { enable_event_broker = false event_buffer_size = 200 + plan_rejection_tracker { + enabled = true + node_threshold = 100 + node_window = "41m" + } + server_join { retry_join = ["1.1.1.1", "2.2.2.2"] retry_max = 3 diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 71a653ca1..c5053b921 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -278,6 +278,11 @@ "node_gc_threshold": "12h", "non_voting_server": true, "num_schedulers": 2, + "plan_rejection_tracker": { + "enabled": true, + "node_threshold": 100, + "node_window": "41m" + }, "raft_protocol": 3, "raft_multiplier": 4, "redundancy_zone": "foo", diff --git a/command/agent/testdata/sample0.json b/command/agent/testdata/sample0.json index 1c450a32f..a7cc60f49 100644 --- a/command/agent/testdata/sample0.json +++ b/command/agent/testdata/sample0.json @@ -55,6 +55,10 @@ "bootstrap_expect": 3, "enabled": true, "encrypt": "sHck3WL6cxuhuY7Mso9BHA==", + "plan_rejection_tracker": { + "node_threshold": 100, + "node_window": "31m" + }, "retry_join": [ "10.0.0.101", "10.0.0.102", diff --git a/command/agent/testdata/sample1/sample0.json b/command/agent/testdata/sample1/sample0.json index 07796e913..a806ea909 100644 --- a/command/agent/testdata/sample1/sample0.json +++ b/command/agent/testdata/sample1/sample0.json @@ -20,6 +20,10 @@ "bootstrap_expect": 3, "enabled": true, "encrypt": "sHck3WL6cxuhuY7Mso9BHA==", + "plan_rejection_tracker": { + "node_threshold": 100, + "node_window": "31m" + }, "retry_join": [ "10.0.0.101", "10.0.0.102", diff --git a/nomad/config.go b/nomad/config.go index 7c67f1b38..615f014be 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -248,6 +248,17 @@ type Config struct { // additional delay is selected from this range randomly. EvalFailedFollowupDelayRange time.Duration + // NodePlanRejectionEnabled controls if node rejection tracker is enabled. + NodePlanRejectionEnabled bool + + // NodePlanRejectionThreshold is the number of times a node must have a + // plan rejection before it is set as ineligible. + NodePlanRejectionThreshold int + + // NodePlanRejectionWindow is the time window used to track plan + // rejections for nodes. + NodePlanRejectionWindow time.Duration + // MinHeartbeatTTL is the minimum time between heartbeats. // This is used as a floor to prevent excessive updates. MinHeartbeatTTL time.Duration @@ -415,6 +426,9 @@ func DefaultConfig() *Config { MaxHeartbeatsPerSecond: 50.0, HeartbeatGrace: 10 * time.Second, FailoverHeartbeatTTL: 300 * time.Second, + NodePlanRejectionEnabled: false, + NodePlanRejectionThreshold: 15, + NodePlanRejectionWindow: 10 * time.Minute, ConsulConfig: config.DefaultConsulConfig(), VaultConfig: config.DefaultVaultConfig(), RPCHoldTimeout: 5 * time.Second, diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index ced8198ef..b7b929fe6 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -25,20 +25,45 @@ type planner struct { // planQueue is used to manage the submitted allocation // plans that are waiting to be assessed by the leader planQueue *PlanQueue + + // badNodeTracker keeps a score for nodes that have plan rejections. + // Plan rejections are somewhat expected given Nomad's optimistic + // scheduling, but repeated rejections for the same node may indicate an + // undetected issue, so we need to track rejection history. + badNodeTracker BadNodeTracker } // newPlanner returns a new planner to be used for managing allocation plans. func newPlanner(s *Server) (*planner, error) { + log := s.logger.Named("planner") + // Create a plan queue planQueue, err := NewPlanQueue() if err != nil { return nil, err } + // Create the bad node tracker. + var badNodeTracker BadNodeTracker + if s.config.NodePlanRejectionEnabled { + config := DefaultCachedBadNodeTrackerConfig() + + config.Window = s.config.NodePlanRejectionWindow + config.Threshold = s.config.NodePlanRejectionThreshold + + badNodeTracker, err = NewCachedBadNodeTracker(log, config) + if err != nil { + return nil, err + } + } else { + badNodeTracker = &NoopBadNodeTracker{} + } + return &planner{ - Server: s, - log: s.logger.Named("planner"), - planQueue: planQueue, + Server: s, + log: log, + planQueue: planQueue, + badNodeTracker: badNodeTracker, }, nil } @@ -144,6 +169,13 @@ func (p *planner) planApply() { continue } + // Check if any of the rejected nodes should be made ineligible. + for _, nodeID := range result.RejectedNodes { + if p.badNodeTracker.Add(nodeID) { + result.IneligibleNodes = append(result.IneligibleNodes, nodeID) + } + } + // Fast-path the response if there is nothing to do if result.IsNoOp() { pending.respond(result, nil) @@ -207,6 +239,8 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 // applyPlan is used to apply the plan result and to return the alloc index func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { + now := time.Now().UTC().UnixNano() + // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ @@ -214,11 +248,12 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, + IneligibleNodes: result.IneligibleNodes, EvalID: plan.EvalID, + UpdatedAt: now, } preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - now := time.Now().UTC().UnixNano() if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) { // Initialize the allocs request using the new optimized log entry format. @@ -493,6 +528,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan // errors since we are processing in parallel. var mErr multierror.Error partialCommit := false + rejectedNodes := make(map[string]struct{}, 0) // handleResult is used to process the result of evaluateNodePlan handleResult := func(nodeID string, fit bool, reason string, err error) (cancel bool) { @@ -516,8 +552,11 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan "node_id", nodeID, "reason", reason, "eval_id", plan.EvalID, "namespace", plan.Job.Namespace) } - // Set that this is a partial commit + // Set that this is a partial commit and store the node that was + // rejected so the plan applier can detect repeated plan rejections + // for the same node. partialCommit = true + rejectedNodes[nodeID] = struct{}{} // If we require all-at-once scheduling, there is no point // to continue the evaluation, as we've already failed. @@ -622,6 +661,10 @@ OUTER: // placed but wasn't actually placed correctDeploymentCanaries(result) } + + for n := range rejectedNodes { + result.RejectedNodes = append(result.RejectedNodes, n) + } return result, mErr.ErrorOrNil() } diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go new file mode 100644 index 000000000..48783fbc4 --- /dev/null +++ b/nomad/plan_apply_node_tracker.go @@ -0,0 +1,212 @@ +package nomad + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/nomad/helper" + "golang.org/x/time/rate" +) + +type BadNodeTracker interface { + Add(string) bool + EmitStats(time.Duration, <-chan struct{}) +} + +// NoopBadNodeTracker is a no-op implementation of bad node tracker that is +// used when tracking is disabled. +type NoopBadNodeTracker struct{} + +func (n *NoopBadNodeTracker) EmitStats(time.Duration, <-chan struct{}) {} +func (n *NoopBadNodeTracker) Add(string) bool { + return false +} + +// CachedBadNodeTracker keeps a record of nodes marked as bad by the plan +// applier in a LRU cache. +// +// It takes a time window and a threshold value. Plan rejections for a node +// will be registered with its timestamp. If the number of rejections within +// the time window is greater than the threshold the node is reported as bad. +// +// The tracker uses a fixed size cache that evicts old entries based on access +// frequency and recency. +type CachedBadNodeTracker struct { + logger hclog.Logger + cache *lru.TwoQueueCache + limiter *rate.Limiter + window time.Duration + threshold int +} + +type CachedBadNodeTrackerConfig struct { + CacheSize int + RateLimit float64 + BurstSize int + Window time.Duration + Threshold int +} + +func DefaultCachedBadNodeTrackerConfig() CachedBadNodeTrackerConfig { + return CachedBadNodeTrackerConfig{ + CacheSize: 50, + + // Limit marking 5 nodes per 30min as ineligible with an initial + // burst of 10 nodes. + RateLimit: 5.0 / (30 * 60), + BurstSize: 10, + + // Consider a node as bad if it is added more than 100 times in a 5min + // window period. + Window: 5 * time.Minute, + Threshold: 100, + } +} + +// NewCachedBadNodeTracker returns a new CachedBadNodeTracker. +func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerConfig) (*CachedBadNodeTracker, error) { + log := logger.Named("bad_node_tracker"). + With("window", config.Window). + With("threshold", config.Threshold) + + cache, err := lru.New2Q(config.CacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create new bad node tracker: %v", err) + } + + return &CachedBadNodeTracker{ + logger: log, + cache: cache, + limiter: rate.NewLimiter(rate.Limit(config.RateLimit), config.BurstSize), + window: config.Window, + threshold: config.Threshold, + }, nil +} + +// Add records a new rejection for a node and returns true if the number of +// rejections reaches the threshold. +// +// If it's the first time the node is added it will be included in the internal +// cache. If the cache is full the least recently updated or accessed node is +// evicted. +func (c *CachedBadNodeTracker) Add(nodeID string) bool { + value, ok := c.cache.Get(nodeID) + if !ok { + value = newBadNodeStats(nodeID, c.window) + c.cache.Add(nodeID, value) + } + stats := value.(*badNodeStats) + + now := time.Now() + stats.record(now) + + return c.isBad(now, stats) +} + +// EmitStats generates metrics for the bad nodes being currently tracked. Must +// be called in a goroutine. +func (c *CachedBadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + + for { + timer.Reset(period) + + select { + case <-timer.C: + c.emitStats() + case <-stopCh: + return + } + } +} + +// isBad returns true if the node has more rejections than the threshold within +// the time window. +func (c *CachedBadNodeTracker) isBad(t time.Time, stats *badNodeStats) bool { + score := stats.score(t) + logger := c.logger.With("node_id", stats.id, "score", score) + + logger.Trace("checking if node is bad") + if score >= c.threshold { + // Limit the number of nodes we report as bad to avoid mass assigning + // nodes as ineligible, but do it after Get to keep the cache entry + // fresh. + if !c.limiter.Allow() { + logger.Trace("node is bad, but returning false due to rate limiting") + return false + } + return true + } + + return false +} + +func (c *CachedBadNodeTracker) emitStats() { + now := time.Now() + for _, k := range c.cache.Keys() { + value, _ := c.cache.Get(k) + stats := value.(*badNodeStats) + score := stats.score(now) + + labels := []metrics.Label{ + {Name: "node_id", Value: k.(string)}, + } + metrics.SetGaugeWithLabels([]string{"nomad", "plan", "rejection_tracker", "node_score"}, float32(score), labels) + } +} + +// badNodeStats represents a node being tracked by a BadNodeTracker. +type badNodeStats struct { + id string + history []time.Time + window time.Duration +} + +// newBadNodeStats returns an empty badNodeStats. +func newBadNodeStats(id string, window time.Duration) *badNodeStats { + return &badNodeStats{ + id: id, + window: window, + } +} + +// score returns the number of rejections within the past time window. +func (s *badNodeStats) score(t time.Time) int { + active, expired := s.countActive(t) + + // Remove expired records. + if expired > 0 { + s.history = s.history[expired:] + } + + return active +} + +// record adds a new entry to the stats history and returns the new score. +func (s *badNodeStats) record(t time.Time) { + s.history = append(s.history, t) +} + +// countActive returns the number of records that happened after the time +// window started (active) and before (expired). +func (s *badNodeStats) countActive(t time.Time) (int, int) { + windowStart := t.Add(-s.window) + + // Assume all values are expired and move back from history until we find + // a record that actually happened before the window started. + expired := len(s.history) + for ; expired > 0; expired-- { + i := expired - 1 + ts := s.history[i] + if ts.Before(windowStart) { + break + } + } + + active := len(s.history) - expired + return active, expired +} diff --git a/nomad/plan_apply_node_tracker_test.go b/nomad/plan_apply_node_tracker_test.go new file mode 100644 index 000000000..018335c4f --- /dev/null +++ b/nomad/plan_apply_node_tracker_test.go @@ -0,0 +1,164 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/ci" + "github.com/stretchr/testify/require" +) + +func TesCachedtBadNodeTracker(t *testing.T) { + ci.Parallel(t) + + config := DefaultCachedBadNodeTrackerConfig() + config.CacheSize = 3 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + tracker.Add(fmt.Sprintf("node-%d", i+1)) + } + + require.Equal(t, config.CacheSize, tracker.cache.Len()) + + // Only track the most recent values. + expected := []string{"node-8", "node-9", "node-10"} + require.ElementsMatch(t, expected, tracker.cache.Keys()) +} + +func TestCachedBadNodeTracker_isBad(t *testing.T) { + ci.Parallel(t) + + config := DefaultCachedBadNodeTrackerConfig() + config.CacheSize = 3 + config.Threshold = 4 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) + require.NoError(t, err) + + // Populate cache. + tracker.Add("node-1") + + tracker.Add("node-2") + tracker.Add("node-2") + + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + + testCases := []struct { + name string + nodeID string + bad bool + }{ + { + name: "node-1 is not bad", + nodeID: "node-1", + bad: false, + }, + { + name: "node-3 is bad", + nodeID: "node-3", + bad: true, + }, + } + + now := time.Now() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Read value from cached. + v, ok := tracker.cache.Get(tc.nodeID) + require.True(t, ok) + + // Check if it's bad. + stats := v.(*badNodeStats) + got := tracker.isBad(now, stats) + require.Equal(t, tc.bad, got) + }) + } + + future := time.Now().Add(2 * config.Window) + nodes := []string{"node-1", "node-2", "node-3"} + for _, n := range nodes { + t.Run(fmt.Sprintf("%s cache expires", n), func(t *testing.T) { + v, ok := tracker.cache.Get(n) + require.True(t, ok) + + stats := v.(*badNodeStats) + bad := tracker.isBad(future, stats) + require.False(t, bad) + }) + } +} + +func TesCachedtBadNodeTracker_rateLimit(t *testing.T) { + ci.Parallel(t) + + config := DefaultCachedBadNodeTrackerConfig() + config.Threshold = 3 + config.RateLimit = float64(1) // Get a new token every second. + config.BurstSize = 3 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) + require.NoError(t, err) + + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + + v, ok := tracker.cache.Get("node-1") + require.True(t, ok) + + stats := v.(*badNodeStats) + + // Burst allows for max 3 operations. + now := time.Now() + require.True(t, tracker.isBad(now, stats)) + require.True(t, tracker.isBad(now, stats)) + require.True(t, tracker.isBad(now, stats)) + require.False(t, tracker.isBad(now, stats)) + + // Wait for a new token. + time.Sleep(time.Second) + now = time.Now() + require.True(t, tracker.isBad(now, stats)) + require.False(t, tracker.isBad(now, stats)) +} + +func TestBadNodeStats_score(t *testing.T) { + ci.Parallel(t) + + window := time.Minute + stats := newBadNodeStats("node-1", window) + + now := time.Now() + require.Equal(t, 0, stats.score(now)) + + stats.record(now) + stats.record(now) + stats.record(now) + require.Equal(t, 3, stats.score(now)) + require.Len(t, stats.history, 3) + + halfWindow := now.Add(window / 2) + stats.record(halfWindow) + require.Equal(t, 4, stats.score(halfWindow)) + require.Len(t, stats.history, 4) + + fullWindow := now.Add(window).Add(time.Second) + require.Equal(t, 1, stats.score(fullWindow)) + require.Len(t, stats.history, 1) + + afterWindow := now.Add(2 * window) + require.Equal(t, 0, stats.score(afterWindow)) + require.Len(t, stats.history, 0) +} diff --git a/nomad/server.go b/nomad/server.go index decc909f5..ec4ca4c8e 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -481,6 +481,9 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr // Emit metrics for the plan queue go s.planQueue.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the planner's bad node tracker. + go s.planner.badNodeTracker.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the blocked eval tracker. go s.blockedEvals.EmitStats(time.Second, s.shutdownCh) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0aae2d2a2..87a348e1f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -35,6 +35,17 @@ const ( ) const ( + // NodeEligibilityEventPlanRejectThreshold is the message used when the node + // is set to ineligible due to multiple plan failures. + // This is a preventive measure to signal scheduler workers to not consider + // the node for future placements. + // Plan rejections for a node are expected due to the optimistic and + // concurrent nature of the scheduling process, but repeated failures for + // the same node may indicate an underlying issue not detected by Nomad. + // The plan applier keeps track of plan rejection history and will mark + // nodes as ineligible if they cross a given threshold. + NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information" + // NodeRegisterEventRegistered is the message used when the node becomes // registered. NodeRegisterEventRegistered = "Node registered" @@ -360,6 +371,21 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + // Mark nodes as ineligible. + for _, nodeID := range results.IneligibleNodes { + s.logger.Warn("marking node as ineligible due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information", "node_id", nodeID) + + nodeEvent := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemScheduler). + SetMessage(NodeEligibilityEventPlanRejectThreshold) + + err := s.updateNodeEligibilityImpl(index, nodeID, + structs.NodeSchedulingIneligible, results.UpdatedAt, nodeEvent, txn) + if err != nil { + return err + } + } + // Upsert the newly created or updated deployment if results.Deployment != nil { if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil { @@ -1137,10 +1163,15 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, // UpdateNodeEligibility is used to update the scheduling eligibility of a node func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + if err := s.updateNodeEligibilityImpl(index, nodeID, eligibility, updatedAt, event, txn); err != nil { + return err + } + return txn.Commit() +} +func (s *StateStore) updateNodeEligibilityImpl(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent, txn *txn) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -1177,7 +1208,7 @@ func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index ui return fmt.Errorf("index update failed: %v", err) } - return txn.Commit() + return nil } // UpsertNodeEvents adds the node events to the nodes, rotating events as diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 75ed14609..800531e1b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -931,6 +931,14 @@ type ApplyPlanResultsRequest struct { // PreemptionEvals is a slice of follow up evals for jobs whose allocations // have been preempted to place allocs in this plan PreemptionEvals []*Evaluation + + // IneligibleNodes are nodes the plan applier has repeatedly rejected + // placements for and should therefore be considered ineligible by workers + // to avoid retrying them repeatedly. + IneligibleNodes []string + + // UpdatedAt represents server time of receiving request. + UpdatedAt int64 } // AllocUpdateRequest is used to submit changes to allocations, either @@ -1650,6 +1658,7 @@ const ( NodeEventSubsystemDriver = "Driver" NodeEventSubsystemHeartbeat = "Heartbeat" NodeEventSubsystemCluster = "Cluster" + NodeEventSubsystemScheduler = "Scheduler" NodeEventSubsystemStorage = "Storage" ) @@ -11477,6 +11486,16 @@ type PlanResult struct { // as stopped. NodePreemptions map[string][]*Allocation + // RejectedNodes are nodes the scheduler worker has rejected placements for + // and should be considered for ineligibility by the plan applier to avoid + // retrying them repeatedly. + RejectedNodes []string + + // IneligibleNodes are nodes the plan applier has repeatedly rejected + // placements for and should therefore be considered ineligible by workers + // to avoid retrying them repeatedly. + IneligibleNodes []string + // RefreshIndex is the index the worker should refresh state up to. // This allows all evictions and allocations to be materialized. // If any allocations were rejected due to stale data (node state, @@ -11490,8 +11509,9 @@ type PlanResult struct { // IsNoOp checks if this plan result would do nothing func (p *PlanResult) IsNoOp() bool { - return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && - len(p.DeploymentUpdates) == 0 && p.Deployment == nil + return len(p.IneligibleNodes) == 0 && len(p.NodeUpdate) == 0 && + len(p.NodeAllocation) == 0 && len(p.DeploymentUpdates) == 0 && + p.Deployment == nil } // FullCommit is used to check if all the allocations in a plan diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index a3a64ca46..e139f5c46 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -156,6 +156,10 @@ server { disallow this server from making any scheduling decisions. This defaults to the number of CPU cores. +- `plan_rejection_tracker` ([PlanRejectionTracker](#plan_rejection_tracker-parameters)) - + Configuration for the plan rejection tracker that the Nomad leader uses to + track the history of plan rejections. + - `raft_boltdb` - This is a nested object that allows configuring options for Raft's BoltDB based log store. - `no_freelist_sync` - Setting this to `true` will disable syncing the BoltDB @@ -239,6 +243,30 @@ server { section for more information on the format of the string. This field is deprecated in favor of the [server_join stanza][server-join]. +### `plan_rejection_tracker` Parameters + +The leader plan rejection tracker can be adjusted to prevent evaluations from +getting stuck due to always being scheduled to a client that may have an +unexpected issue. Refer to [Monitoring Nomad][monitoring_nomad_progress] for +more details. + +- `enabled` `(bool: false)` - Specifies if plan rejections should be tracked. + +- `node_threshold` `(int: 100)` - The number of plan rejections for a node + within the `node_window` to trigger a client to be set as ineligible. + +- `node_window` `(string: "5m")` - The time window for when plan rejections for + a node should be considered. + +If you observe too many false positives (clients being marked as ineligible +even if they don't present any problem) you may want to increase +`node_threshold`. + +Or if you are noticing jobs not being scheduled due to plan rejections for the +same `node_id` and the client is not being set as ineligible you can try +increasing the `node_window` so more historical rejections are taken into +account. + ## `server` Examples ### Common Setup @@ -331,5 +359,6 @@ server { [update-scheduler-config]: /api-docs/operator/scheduler#update-scheduler-configuration 'Scheduler Config' [bootstrapping a cluster]: /docs/faq#bootstrapping [rfc4648]: https://tools.ietf.org/html/rfc4648#section-5 +[monitoring_nomad_progress]: /docs/operations/monitoring-nomad#progress [`nomad operator keygen`]: /docs/commands/operator/keygen [search]: /docs/configuration/search diff --git a/website/content/docs/operations/metrics-reference.mdx b/website/content/docs/operations/metrics-reference.mdx index 746097bcf..926792810 100644 --- a/website/content/docs/operations/metrics-reference.mdx +++ b/website/content/docs/operations/metrics-reference.mdx @@ -394,6 +394,7 @@ those listed in [Key Metrics](#key-metrics) above. | `nomad.nomad.plan.apply` | Time elapsed to apply a plan | Nanoseconds | Summary | host | | `nomad.nomad.plan.evaluate` | Time elapsed to evaluate a plan | Nanoseconds | Summary | host | | `nomad.nomad.plan.node_rejected` | Number of times a node has had a plan rejected | Integer | Counter | host, node_id | +| `nomad.nomad.plan.rejection_tracker.node_score` | Number of times a node has had a plan rejected within the tracker window | Integer | Gauge | host, node_id | | `nomad.nomad.plan.queue_depth` | Count of evals in the plan queue | Integer | Gauge | host | | `nomad.nomad.plan.submit` | Time elapsed for `Plan.Submit` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.plan.wait_for_index` | Time elapsed that planner waits for the raft index of the plan to be processed | Nanoseconds | Summary | host | diff --git a/website/content/docs/operations/monitoring-nomad.mdx b/website/content/docs/operations/monitoring-nomad.mdx index ac3e60627..6dedbb514 100644 --- a/website/content/docs/operations/monitoring-nomad.mdx +++ b/website/content/docs/operations/monitoring-nomad.mdx @@ -149,10 +149,34 @@ While it is possible for these log lines to occur infrequently due to normal cluster conditions, they should not appear repeatedly and prevent the job from eventually running (look up the evaluation ID logged to find the job). -If this log *does* appear repeatedly with the same `node_id` referenced, try +#### Plan rejection tracker + +Nomad provides a mechanism to track the history of plan rejections per client +and mark them as ineligible if the number goes above a given threshold within a +time window. This functionality can be enabled using the +[`plan_rejection_tracker`] server configuration. + +When a node is marked as ineligible due to excessive plan rejections, the +following node event is registered: + +``` +Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information +``` + +Along with the log line: + +``` +[WARN] nomad.state_store: marking node as ineligible due to multiple plan rejections: node_id=67af2541-5e96-6f54-9095-11089d627626 +``` + +If a client is marked as ineligible due to repeated plan rejections, try [draining] the node and shutting it down. Misconfigurations not caught by validation can cause nodes to enter this state: [#11830][gh-11830]. +If the `plan for node rejected` log *does* appear repeatedly with the same +`node_id` referenced but the client is not being set as ineligible you can try +adjusting the [`plan_rejection_tracker`] configuration of servers. + ### Performance The following metrics allow observing changes in throughput at the various @@ -278,6 +302,7 @@ latency and packet loss for the [Serf] address. [metric-types]: /docs/telemetry/metrics#metric-types [metrics-api-endpoint]: /api-docs/metrics [prometheus-telem]: /docs/configuration/telemetry#prometheus +[`plan_rejection_tracker`]: /docs/configuration/server#plan_rejection_tracker [serf]: /docs/configuration#serf-1 [statsd-exporter]: https://github.com/prometheus/statsd_exporter [statsd-telem]: /docs/configuration/telemetry#statsd