nomad: eval broker serializes by JobID

This commit is contained in:
Armon Dadgar 2015-08-05 17:55:15 -07:00
parent 078d80432a
commit 011484ea74
4 changed files with 213 additions and 9 deletions

View file

@ -27,10 +27,22 @@ type EvalBroker struct {
enabled bool
stats *BrokerStats
evals map[string]*structs.Evaluation
// evals tracks queued evaluations by ID to de-duplicate enqueue
evals map[string]struct{}
ready map[string]PendingEvaluations
unack map[string]*unackEval
// jobEvals tracks queued evaluations by JobID to serialize them
jobEvals map[string]string
// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[string]PendingEvaluations
// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
// unack is a map of evalID to an un-acknowledged evaluation
unack map[string]*unackEval
// waiting is used to notify on a per-scheduler basis of ready work
waiting map[string]chan struct{}
l sync.RWMutex
@ -58,7 +70,9 @@ func NewEvalBroker(timeout time.Duration) (*EvalBroker, error) {
nackTimeout: timeout,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]*structs.Evaluation),
evals: make(map[string]struct{}),
jobEvals: make(map[string]string),
blocked: make(map[string]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
@ -94,7 +108,7 @@ func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error {
if _, ok := b.evals[eval.ID]; ok {
return nil
} else if b.enabled {
b.evals[eval.ID] = eval
b.evals[eval.ID] = struct{}{}
}
return b.enqueueLocked(eval)
@ -107,6 +121,18 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation) error {
return nil
}
// Check if there is an evaluation for this JobID pending
pendingEval := b.jobEvals[eval.JobID]
if pendingEval == "" {
b.jobEvals[eval.JobID] = eval.ID
} else if pendingEval != eval.ID {
blocked := b.blocked[eval.JobID]
heap.Push(&blocked, eval)
b.blocked[eval.JobID] = blocked
b.stats.TotalBlocked += 1
return nil
}
// Find the pending by scheduler class
pending, ok := b.ready[eval.Type]
if !ok {
@ -318,6 +344,7 @@ func (b *EvalBroker) Ack(evalID string) error {
if !ok {
return fmt.Errorf("Evaluation ID not found")
}
jobID := unack.Eval.JobID
// Ensure we were able to stop the timer
if !unack.NackTimer.Stop() {
@ -327,11 +354,25 @@ func (b *EvalBroker) Ack(evalID string) error {
// Cleanup
delete(b.unack, evalID)
delete(b.evals, evalID)
delete(b.jobEvals, jobID)
// Update the stats
b.stats.TotalUnacked -= 1
bySched := b.stats.ByScheduler[unack.Eval.Type]
bySched.Unacked -= 1
// Check if there are any blocked evaluations
if blocked := b.blocked[jobID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)
if len(blocked) > 0 {
b.blocked[jobID] = blocked
} else {
delete(b.blocked, jobID)
}
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
return b.enqueueLocked(eval)
}
return nil
}
@ -358,7 +399,7 @@ func (b *EvalBroker) Nack(evalID string) error {
bySched.Unacked -= 1
// Re-enqueue the work
// TODO: Re-enqueue at higher priority to avoid starvation.
// XXX: Re-enqueue at higher priority to avoid starvation.
return b.enqueueLocked(unack.Eval)
}
@ -381,8 +422,11 @@ func (b *EvalBroker) Flush() {
// Reset the broker
b.stats.TotalReady = 0
b.stats.TotalUnacked = 0
b.stats.TotalBlocked = 0
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]*structs.Evaluation)
b.evals = make(map[string]struct{})
b.jobEvals = make(map[string]string)
b.blocked = make(map[string]PendingEvaluations)
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
}
@ -399,6 +443,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
// Copy all the stats
stats.TotalReady = b.stats.TotalReady
stats.TotalUnacked = b.stats.TotalUnacked
stats.TotalBlocked = b.stats.TotalBlocked
for sched, subStat := range b.stats.ByScheduler {
subStatCopy := new(SchedulerStats)
*subStatCopy = *subStat
@ -415,6 +460,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh chan struct{}) {
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked))
for sched, schedStats := range stats.ByScheduler {
metrics.SetGauge([]string{"nomad", "broker", sched, "ready"}, float32(schedStats.Ready))
metrics.SetGauge([]string{"nomad", "broker", sched, "unacked"}, float32(schedStats.Unacked))
@ -430,6 +476,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh chan struct{}) {
type BrokerStats struct {
TotalReady int
TotalUnacked int
TotalBlocked int
ByScheduler map[string]*SchedulerStats
}
@ -448,7 +495,7 @@ func (p PendingEvaluations) Len() int {
// so that the "min" in the min-heap is the element with the
// highest priority
func (p PendingEvaluations) Less(i, j int) bool {
if p[i].Priority != p[j].Priority {
if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return p[i].CreateIndex < p[j].CreateIndex

View file

@ -163,6 +163,158 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
}
}
func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
eval := mockEval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
eval2 := mockEval()
eval2.JobID = eval.JobID
eval2.CreateIndex = eval.CreateIndex + 1
err = b.Enqueue(eval2)
if err != nil {
t.Fatalf("err: %v", err)
}
eval3 := mockEval()
eval3.JobID = eval.JobID
eval3.CreateIndex = eval.CreateIndex + 2
err = b.Enqueue(eval3)
if err != nil {
t.Fatalf("err: %v", err)
}
stats := b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 2 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 2 {
t.Fatalf("bad: %#v", stats)
}
// Ack out
err = b.Ack(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval2 {
t.Fatalf("bad : %#v", out)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Ack out
err = b.Ack(eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval3 {
t.Fatalf("bad : %#v", out)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Ack out
err = b.Ack(eval3.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Enqueue_Disable(t *testing.T) {
b := testBroker(t, 0)
@ -299,7 +451,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
// This will fail randomly at times. It is very hard to
// test deterministically that its acting randomly.
if counter >= 20 || counter <= -20 {
if counter >= 25 || counter <= -25 {
t.Fatalf("unlikely sequence: %d", counter)
}
}

View file

@ -116,6 +116,7 @@ func mockEval() *structs.Evaluation {
ID: generateUUID(),
Priority: 50,
Type: structs.JobTypeService,
JobID: generateUUID(),
Status: structs.EvalStatusPending,
}
return eval

View file

@ -616,6 +616,10 @@ type Evaluation struct {
// was created. (Job change, node failure, alloc failure, etc).
TriggeredBy string
// JobID is the job this evaluation is scoped to. Evalutions cannot
// be run in parallel for a given JobID, so we serialize on this.
JobID string
// Status of the evaluation
Status string