nomad: wait for plan to apply async
This commit is contained in:
parent
2cebcf289c
commit
8edc9da37d
|
@ -75,25 +75,24 @@ func (s *Server) planApply() {
|
|||
continue
|
||||
}
|
||||
|
||||
// Apply the plan if there is anything to do
|
||||
if !result.IsNoOp() {
|
||||
future, err := s.applyPlan(result)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err)
|
||||
pending.respond(nil, err)
|
||||
continue
|
||||
}
|
||||
allocIndex, err := s.planWaitFuture(future)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err)
|
||||
pending.respond(nil, err)
|
||||
continue
|
||||
}
|
||||
result.AllocIndex = allocIndex
|
||||
// Fast-path the response if there is nothing to do
|
||||
if result.IsNoOp() {
|
||||
pending.respond(result, nil)
|
||||
continue
|
||||
}
|
||||
|
||||
// Respond to the plan
|
||||
pending.respond(result, nil)
|
||||
// Dispatch the Raft transaction for the plan
|
||||
future, err := s.applyPlan(result)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err)
|
||||
pending.respond(nil, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Respond to the plan in async
|
||||
waitCh := make(chan struct{})
|
||||
go s.asyncPlanWait(waitCh, future, result, pending)
|
||||
<-waitCh
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -111,13 +110,22 @@ func (s *Server) applyPlan(result *structs.PlanResult) (raft.ApplyFuture, error)
|
|||
return s.raftApplyFuture(structs.AllocUpdateRequestType, &req)
|
||||
}
|
||||
|
||||
// planWaitFuture is used to wait for the Raft future to complete
|
||||
func (s *Server) planWaitFuture(future raft.ApplyFuture) (uint64, error) {
|
||||
// asyncPlanWait is used to apply and respond to a plan async
|
||||
func (s *Server) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
|
||||
result *structs.PlanResult, pending *pendingPlan) {
|
||||
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
|
||||
defer close(waitCh)
|
||||
|
||||
// Wait for the plan to apply
|
||||
if err := future.Error(); err != nil {
|
||||
return 0, err
|
||||
s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err)
|
||||
pending.respond(nil, err)
|
||||
return
|
||||
}
|
||||
return future.Index(), nil
|
||||
|
||||
// Respond to the plan
|
||||
result.AllocIndex = future.Index()
|
||||
pending.respond(result, nil)
|
||||
}
|
||||
|
||||
// evaluatePlan is used to determine what portions of a plan
|
||||
|
|
|
@ -7,8 +7,17 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// planWaitFuture is used to wait for the Raft future to complete
|
||||
func planWaitFuture(future raft.ApplyFuture) (uint64, error) {
|
||||
if err := future.Error(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return future.Index(), nil
|
||||
}
|
||||
|
||||
func testRegisterNode(t *testing.T, s *Server, n *structs.Node) {
|
||||
// Create the register request
|
||||
req := &structs.NodeRegisterRequest{
|
||||
|
@ -50,7 +59,7 @@ func TestPlanApply_applyPlan(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
index, err := s1.planWaitFuture(future)
|
||||
index, err := planWaitFuture(future)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -95,7 +104,7 @@ func TestPlanApply_applyPlan(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
index, err = s1.planWaitFuture(future)
|
||||
index, err = planWaitFuture(future)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue