Merge pull request #827 from hashicorp/f-dynamic-pool

Set the size of the evaluation pool based on cores
This commit is contained in:
Armon Dadgar 2016-02-20 15:56:50 -08:00
commit 2d05f8cef7
5 changed files with 90 additions and 18 deletions

View file

@ -101,8 +101,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Disable workers to free half the cores for use in the plan queue and
// evaluation broker
if numWorkers := len(s.workers); numWorkers > 1 {
// Disabling half the workers frees half the CPUs.
for i := 0; i < numWorkers/2; i++ {
// Disabling 3/4 of the workers frees CPU for raft and the
// plan applier which uses 1/2 the cores.
for i := 0; i < (3 * numWorkers / 4); i++ {
s.workers[i].SetPause(true)
}
}

View file

@ -2,6 +2,7 @@ package nomad
import (
"fmt"
"runtime"
"time"
"github.com/armon/go-metrics"
@ -42,7 +43,13 @@ func (s *Server) planApply() {
// holds an optimistic state which includes that plan application.
var waitCh chan struct{}
var snap *state.StateSnapshot
pool := NewEvaluatePool(workerPoolSize, workerPoolBufferSize)
// Setup a worker pool with half the cores, with at least 1
poolSize := runtime.NumCPU() / 2
if poolSize == 0 {
poolSize = 1
}
pool := NewEvaluatePool(poolSize, workerPoolBufferSize)
defer pool.Shutdown()
for {

View file

@ -6,9 +6,6 @@ import (
)
const (
// workerPoolSize is the size of the worker pool
workerPoolSize = 2
// workerPoolBufferSize is the size of the buffers used to push
// request to the workers and to collect the responses. It should
// be large enough just to keep things busy
@ -19,9 +16,10 @@ const (
// if a plan is valid. It can be used to parallelize the evaluation
// of a plan.
type EvaluatePool struct {
workers int
req chan evaluateRequest
res chan evaluateResult
workers int
workerStop []chan struct{}
req chan evaluateRequest
res chan evaluateResult
}
type evaluateRequest struct {
@ -39,16 +37,51 @@ type evaluateResult struct {
// NewEvaluatePool returns a pool of the given size.
func NewEvaluatePool(workers, bufSize int) *EvaluatePool {
p := &EvaluatePool{
workers: workers,
req: make(chan evaluateRequest, bufSize),
res: make(chan evaluateResult, bufSize),
workers: workers,
workerStop: make([]chan struct{}, workers),
req: make(chan evaluateRequest, bufSize),
res: make(chan evaluateResult, bufSize),
}
for i := 0; i < workers; i++ {
go p.run()
stopCh := make(chan struct{})
p.workerStop[i] = stopCh
go p.run(stopCh)
}
return p
}
// Size returns the current size
func (p *EvaluatePool) Size() int {
return p.workers
}
// SetSize is used to resize the worker pool
func (p *EvaluatePool) SetSize(size int) {
// Protect against a negative size
if size < 0 {
size = 0
}
// Handle an upwards resize
if size >= p.workers {
for i := p.workers; i < size; i++ {
stopCh := make(chan struct{})
p.workerStop = append(p.workerStop, stopCh)
go p.run(stopCh)
}
p.workers = size
return
}
// Handle a downwards resize
for i := p.workers; i > size; i-- {
close(p.workerStop[i-1])
p.workerStop[i-1] = nil
}
p.workerStop = p.workerStop[:size]
p.workers = size
}
// RequestCh is used to push requests
func (p *EvaluatePool) RequestCh() chan<- evaluateRequest {
return p.req
@ -61,13 +94,19 @@ func (p *EvaluatePool) ResultCh() <-chan evaluateResult {
// Shutdown is used to shutdown the pool
func (p *EvaluatePool) Shutdown() {
close(p.req)
p.SetSize(0)
}
// run is a long running go routine per worker
func (p *EvaluatePool) run() {
for req := range p.req {
fit, err := evaluateNodePlan(req.snap, req.plan, req.nodeID)
p.res <- evaluateResult{req.nodeID, fit, err}
func (p *EvaluatePool) run(stopCh chan struct{}) {
for {
select {
case req := <-p.req:
fit, err := evaluateNodePlan(req.snap, req.plan, req.nodeID)
p.res <- evaluateResult{req.nodeID, fit, err}
case <-stopCh:
return
}
}
}

View file

@ -38,3 +38,23 @@ func TestEvaluatePool(t *testing.T) {
t.Fatalf("bad")
}
}
func TestEvaluatePool_Resize(t *testing.T) {
pool := NewEvaluatePool(1, 4)
defer pool.Shutdown()
if n := pool.Size(); n != 1 {
t.Fatalf("bad: %d", n)
}
// Scale up
pool.SetSize(4)
if n := pool.Size(); n != 4 {
t.Fatalf("bad: %d", n)
}
// Scale down
pool.SetSize(2)
if n := pool.Size(); n != 2 {
t.Fatalf("bad: %d", n)
}
}

View file

@ -10,6 +10,11 @@ import (
"github.com/hashicorp/raft"
)
const (
// workerPoolSize is the size of the worker pool
workerPoolSize = 2
)
// planWaitFuture is used to wait for the Raft future to complete
func planWaitFuture(future raft.ApplyFuture) (uint64, error) {
if err := future.Error(); err != nil {