Merge pull request #8089 from hashicorp/b-leader-worker-count
leadership: pause and unpause workers consistently
This commit is contained in:
commit
bf7a3583e5
|
@ -206,12 +206,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
|||
|
||||
// Disable workers to free half the cores for use in the plan queue and
|
||||
// evaluation broker
|
||||
if numWorkers := len(s.workers); numWorkers > 1 {
|
||||
// Disabling 3/4 of the workers frees CPU for raft and the
|
||||
// plan applier which uses 1/2 the cores.
|
||||
for i := 0; i < (3 * numWorkers / 4); i++ {
|
||||
s.workers[i].SetPause(true)
|
||||
}
|
||||
for _, w := range s.pausableWorkers() {
|
||||
w.SetPause(true)
|
||||
}
|
||||
|
||||
// Initialize and start the autopilot routine
|
||||
|
@ -913,14 +909,29 @@ func (s *Server) revokeLeadership() error {
|
|||
}
|
||||
|
||||
// Unpause our worker if we paused previously
|
||||
if len(s.workers) > 1 {
|
||||
for i := 0; i < len(s.workers)/2; i++ {
|
||||
s.workers[i].SetPause(false)
|
||||
}
|
||||
for _, w := range s.pausableWorkers() {
|
||||
w.SetPause(false)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// pausableWorkers returns a slice of the workers
|
||||
// to pause on leader transitions.
|
||||
//
|
||||
// Upon leadership establishment, pause workers to free half
|
||||
// the cores for use in the plan queue and evaluation broker
|
||||
func (s *Server) pausableWorkers() []*Worker {
|
||||
n := len(s.workers)
|
||||
if n <= 1 {
|
||||
return []*Worker{}
|
||||
}
|
||||
|
||||
// Disabling 3/4 of the workers frees CPU for raft and the
|
||||
// plan applier which uses 1/2 the cores.
|
||||
return s.workers[:3*n/4]
|
||||
}
|
||||
|
||||
// reconcile is used to reconcile the differences between Serf
|
||||
// membership and what is reflected in our strongly consistent store.
|
||||
func (s *Server) reconcile() error {
|
||||
|
|
|
@ -1325,6 +1325,37 @@ func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
|
|||
require.True(t, s1.isReadyForConsistentReads())
|
||||
}
|
||||
|
||||
// TestLeader_PausingWorkers asserts that scheduling workers are paused
|
||||
// (and unpaused) upon leader elections (and step downs).
|
||||
func TestLeader_PausingWorkers(t *testing.T) {
|
||||
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 12
|
||||
})
|
||||
defer cleanupS1()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
require.Len(t, s1.workers, 12)
|
||||
|
||||
pausedWorkers := func() int {
|
||||
c := 0
|
||||
for _, w := range s1.workers {
|
||||
w.pauseLock.Lock()
|
||||
if w.paused {
|
||||
c++
|
||||
}
|
||||
w.pauseLock.Unlock()
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// pause 3/4 of the workers
|
||||
require.Equal(t, 9, pausedWorkers())
|
||||
|
||||
err := s1.revokeLeadership()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Zero(t, pausedWorkers())
|
||||
}
|
||||
|
||||
// Test doing an inplace upgrade on a server from raft protocol 2 to 3
|
||||
// This verifies that removing the server and adding it back with a uuid works
|
||||
// even if the server's address stays the same.
|
||||
|
|
Loading…
Reference in a new issue