nomad: overlap plan evaluation with apply
This commit is contained in:
parent
08a32d0952
commit
8715d8ab8f
|
@ -37,6 +37,12 @@ import (
|
|||
// but there are many of those and only a single plan verifier.
|
||||
//
|
||||
func (s *Server) planApply() {
|
||||
// waitCh is used to track an outstanding application
|
||||
// while snap holds an optimistic state which includes
|
||||
// that plan application.
|
||||
var waitCh chan struct{}
|
||||
var snap *state.StateSnapshot
|
||||
|
||||
for {
|
||||
// Pull the next pending plan, exit if we are no longer leader
|
||||
pending, err := s.planQueue.Dequeue(0)
|
||||
|
@ -59,12 +65,23 @@ func (s *Server) planApply() {
|
|||
continue
|
||||
}
|
||||
|
||||
// Check if out last plan has completed
|
||||
select {
|
||||
case <-waitCh:
|
||||
waitCh = nil
|
||||
snap = nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Snapshot the state so that we have a consistent view of the world
|
||||
snap, err := s.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
|
||||
pending.respond(nil, err)
|
||||
continue
|
||||
// if no snapshot is available
|
||||
if snap == nil {
|
||||
snap, err = s.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
|
||||
pending.respond(nil, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Evaluate the plan
|
||||
|
@ -81,6 +98,19 @@ func (s *Server) planApply() {
|
|||
continue
|
||||
}
|
||||
|
||||
// Ensure any parallel apply is complete before
|
||||
// starting the next one. This also limits how out
|
||||
// of date our snapshot can be.
|
||||
if waitCh != nil {
|
||||
<-waitCh
|
||||
snap, err = s.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
|
||||
pending.respond(nil, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch the Raft transaction for the plan
|
||||
future, err := s.applyPlan(result, snap)
|
||||
if err != nil {
|
||||
|
@ -90,9 +120,8 @@ func (s *Server) planApply() {
|
|||
}
|
||||
|
||||
// Respond to the plan in async
|
||||
waitCh := make(chan struct{})
|
||||
waitCh = make(chan struct{})
|
||||
go s.asyncPlanWait(waitCh, future, result, pending)
|
||||
<-waitCh
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue