267 lines
5.8 KiB
Go
267 lines
5.8 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package nomad
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
var (
|
|
// planQueueFlushed is the error used for all pending plans
|
|
// when the queue is flushed or disabled
|
|
planQueueFlushed = fmt.Errorf("plan queue flushed")
|
|
)
|
|
|
|
// PlanFuture is used to return a future for an enqueue
|
|
type PlanFuture interface {
|
|
Wait() (*structs.PlanResult, error)
|
|
}
|
|
|
|
// PlanQueue is used to submit commit plans for task allocations
|
|
// to the current leader. The leader verifies that resources are not
|
|
// over-committed and commits to Raft. This allows sub-schedulers to
|
|
// be optimistically concurrent. In the case of an overcommit, the plan
|
|
// may be partially applied if allowed, or completely rejected (gang commit).
|
|
type PlanQueue struct {
|
|
enabled bool
|
|
stats *QueueStats
|
|
|
|
ready PendingPlans
|
|
waitCh chan struct{}
|
|
|
|
l sync.RWMutex
|
|
}
|
|
|
|
// NewPlanQueue is used to construct and return a new plan queue
|
|
func NewPlanQueue() (*PlanQueue, error) {
|
|
q := &PlanQueue{
|
|
enabled: false,
|
|
stats: new(QueueStats),
|
|
ready: make([]*pendingPlan, 0, 16),
|
|
waitCh: make(chan struct{}, 1),
|
|
}
|
|
return q, nil
|
|
}
|
|
|
|
// pendingPlan is used to wrap a plan that is enqueued
|
|
// so that we can re-use it as a future.
|
|
type pendingPlan struct {
|
|
plan *structs.Plan
|
|
enqueueTime time.Time
|
|
result *structs.PlanResult
|
|
errCh chan error
|
|
}
|
|
|
|
// Wait is used to block for the plan result or potential error
|
|
func (p *pendingPlan) Wait() (*structs.PlanResult, error) {
|
|
err := <-p.errCh
|
|
return p.result, err
|
|
}
|
|
|
|
// respond is used to set the response and error for the future
|
|
func (p *pendingPlan) respond(result *structs.PlanResult, err error) {
|
|
p.result = result
|
|
p.errCh <- err
|
|
}
|
|
|
|
// PendingPlans is a list of waiting plans.
|
|
// We implement the container/heap interface so that this is a
|
|
// priority queue
|
|
type PendingPlans []*pendingPlan
|
|
|
|
// Enabled is used to check if the queue is enabled.
|
|
func (q *PlanQueue) Enabled() bool {
|
|
q.l.RLock()
|
|
defer q.l.RUnlock()
|
|
return q.enabled
|
|
}
|
|
|
|
// SetEnabled is used to control if the queue is enabled. The queue
|
|
// should only be enabled on the active leader.
|
|
func (q *PlanQueue) SetEnabled(enabled bool) {
|
|
q.l.Lock()
|
|
q.enabled = enabled
|
|
q.l.Unlock()
|
|
if !enabled {
|
|
q.Flush()
|
|
}
|
|
}
|
|
|
|
// Enqueue is used to enqueue a plan
|
|
func (q *PlanQueue) Enqueue(plan *structs.Plan) (PlanFuture, error) {
|
|
q.l.Lock()
|
|
defer q.l.Unlock()
|
|
|
|
// Do nothing if not enabled
|
|
if !q.enabled {
|
|
return nil, fmt.Errorf("plan queue is disabled")
|
|
}
|
|
|
|
// Wrap the pending plan
|
|
pending := &pendingPlan{
|
|
plan: plan,
|
|
enqueueTime: time.Now(),
|
|
errCh: make(chan error, 1),
|
|
}
|
|
|
|
// Push onto the heap
|
|
heap.Push(&q.ready, pending)
|
|
|
|
// Update the stats
|
|
q.stats.Depth += 1
|
|
|
|
// Unblock any blocked reader
|
|
select {
|
|
case q.waitCh <- struct{}{}:
|
|
default:
|
|
}
|
|
return pending, nil
|
|
}
|
|
|
|
// Dequeue is used to perform a blocking dequeue
|
|
func (q *PlanQueue) Dequeue(timeout time.Duration) (*pendingPlan, error) {
|
|
SCAN:
|
|
q.l.Lock()
|
|
|
|
// Do nothing if not enabled
|
|
if !q.enabled {
|
|
q.l.Unlock()
|
|
return nil, fmt.Errorf("plan queue is disabled")
|
|
}
|
|
|
|
// Look for available work
|
|
if len(q.ready) > 0 {
|
|
raw := heap.Pop(&q.ready)
|
|
pending := raw.(*pendingPlan)
|
|
q.stats.Depth -= 1
|
|
q.l.Unlock()
|
|
return pending, nil
|
|
}
|
|
q.l.Unlock()
|
|
|
|
// Setup the timeout timer
|
|
var timerCh <-chan time.Time
|
|
if timerCh == nil && timeout > 0 {
|
|
timer := time.NewTimer(timeout)
|
|
defer timer.Stop()
|
|
timerCh = timer.C
|
|
}
|
|
|
|
// Wait for timeout or new work
|
|
select {
|
|
case <-q.waitCh:
|
|
goto SCAN
|
|
case <-timerCh:
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
// Flush is used to reset the state of the plan queue
|
|
func (q *PlanQueue) Flush() {
|
|
q.l.Lock()
|
|
defer q.l.Unlock()
|
|
|
|
// Error out all the futures
|
|
for _, pending := range q.ready {
|
|
pending.respond(nil, planQueueFlushed)
|
|
}
|
|
|
|
// Reset the broker
|
|
q.stats.Depth = 0
|
|
q.ready = make([]*pendingPlan, 0, 16)
|
|
|
|
// Unblock any waiters
|
|
select {
|
|
case q.waitCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Stats is used to query the state of the queue
|
|
func (q *PlanQueue) Stats() *QueueStats {
|
|
// Allocate a new stats struct
|
|
stats := new(QueueStats)
|
|
|
|
q.l.RLock()
|
|
defer q.l.RUnlock()
|
|
|
|
// Copy all the stats
|
|
*stats = *q.stats
|
|
return stats
|
|
}
|
|
|
|
// EmitStats is used to export metrics about the broker while enabled
|
|
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
|
|
timer, stop := helper.NewSafeTimer(period)
|
|
defer stop()
|
|
|
|
for {
|
|
timer.Reset(period)
|
|
|
|
select {
|
|
case <-timer.C:
|
|
stats := q.Stats()
|
|
metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth))
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// QueueStats returns all the stats about the plan queue
|
|
type QueueStats struct {
|
|
Depth int
|
|
}
|
|
|
|
// Len is for the sorting interface
|
|
func (p PendingPlans) Len() int {
|
|
return len(p)
|
|
}
|
|
|
|
// Less is for the sorting interface. We flip the check
|
|
// so that the "min" in the min-heap is the element with the
|
|
// highest priority. For the same priority, we use the enqueue
|
|
// time of the evaluation to give a FIFO ordering.
|
|
func (p PendingPlans) Less(i, j int) bool {
|
|
if p[i].plan.Priority != p[j].plan.Priority {
|
|
return !(p[i].plan.Priority < p[j].plan.Priority)
|
|
}
|
|
return p[i].enqueueTime.Before(p[j].enqueueTime)
|
|
}
|
|
|
|
// Swap is for the sorting interface
|
|
func (p PendingPlans) Swap(i, j int) {
|
|
p[i], p[j] = p[j], p[i]
|
|
}
|
|
|
|
// Push is used to add a new evaluation to the slice
|
|
func (p *PendingPlans) Push(e interface{}) {
|
|
*p = append(*p, e.(*pendingPlan))
|
|
}
|
|
|
|
// Pop is used to remove an evaluation from the slice
|
|
func (p *PendingPlans) Pop() interface{} {
|
|
n := len(*p)
|
|
e := (*p)[n-1]
|
|
(*p)[n-1] = nil
|
|
*p = (*p)[:n-1]
|
|
return e
|
|
}
|
|
|
|
// Peek is used to peek at the next element that would be popped
|
|
func (p PendingPlans) Peek() *pendingPlan {
|
|
n := len(p)
|
|
if n == 0 {
|
|
return nil
|
|
}
|
|
return p[n-1]
|
|
}
|