diff --git a/nomad/leader.go b/nomad/leader.go index a2281fbc8..5d5c417b0 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -206,12 +206,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Disable workers to free half the cores for use in the plan queue and // evaluation broker - if numWorkers := len(s.workers); numWorkers > 1 { - // Disabling 3/4 of the workers frees CPU for raft and the - // plan applier which uses 1/2 the cores. - for i := 0; i < (3 * numWorkers / 4); i++ { - s.workers[i].SetPause(true) - } + for _, w := range s.pausableWorkers() { + w.SetPause(true) } // Initialize and start the autopilot routine @@ -913,14 +909,29 @@ func (s *Server) revokeLeadership() error { } // Unpause our worker if we paused previously - if len(s.workers) > 1 { - for i := 0; i < len(s.workers)/2; i++ { - s.workers[i].SetPause(false) - } + for _, w := range s.pausableWorkers() { + w.SetPause(false) } + return nil } +// pausableWorkers returns a slice of the workers +// to pause on leader transitions. +// +// Upon leadership establishment, pause workers to free half +// the cores for use in the plan queue and evaluation broker +func (s *Server) pausableWorkers() []*Worker { + n := len(s.workers) + if n <= 1 { + return []*Worker{} + } + + // Disabling 3/4 of the workers frees CPU for raft and the + // plan applier which uses 1/2 the cores. + return s.workers[:3*n/4] +} + // reconcile is used to reconcile the differences between Serf // membership and what is reflected in our strongly consistent store. func (s *Server) reconcile() error { diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 03333f639..d88c8aac4 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1325,6 +1325,37 @@ func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) { require.True(t, s1.isReadyForConsistentReads()) } +// TestLeader_PausingWorkers asserts that scheduling workers are paused +// (and unpaused) upon leader elections (and step downs). +func TestLeader_PausingWorkers(t *testing.T) { + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 12 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + require.Len(t, s1.workers, 12) + + pausedWorkers := func() int { + c := 0 + for _, w := range s1.workers { + w.pauseLock.Lock() + if w.paused { + c++ + } + w.pauseLock.Unlock() + } + return c + } + + // pause 3/4 of the workers + require.Equal(t, 9, pausedWorkers()) + + err := s1.revokeLeadership() + require.NoError(t, err) + + require.Zero(t, pausedWorkers()) +} + // Test doing an inplace upgrade on a server from raft protocol 2 to 3 // This verifies that removing the server and adding it back with a uuid works // even if the server's address stays the same.