diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index a56d842f3..0f428da96 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -2,6 +2,7 @@ package nomad import ( "fmt" + "sync" "time" "github.com/armon/go-metrics" @@ -10,9 +11,29 @@ import ( "github.com/hashicorp/nomad/nomad/watch" ) +const ( + // batchUpdateInterval is how long we wait to batch updates + batchUpdateInterval = 50 * time.Millisecond +) + // Node endpoint is used for client interactions type Node struct { srv *Server + + // updates holds pending client status updates for allocations + updates []*structs.Allocation + + // updateFuture is used to wait for the pending batch update + // to complete. This may be nil if no batch is pending. + updateFuture *batchFuture + + // updateTimer is the timer that will trigger the next batch + // update, and may be nil if there is no batch pending. + updateTimer *time.Timer + + // updatesLock synchronizes access to the updates list, + // the future and the timer. + updatesLock sync.Mutex } // Register is used to upsert a client that is available for scheduling @@ -456,18 +477,59 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return fmt.Errorf("must update a single allocation") } - // Commit this update via Raft - _, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, args) - if err != nil { - n.srv.logger.Printf("[ERR] nomad.client: alloc update failed: %v", err) + // Add this to the batch + n.updatesLock.Lock() + n.updates = append(n.updates, args.Alloc...) + + // Start a new batch if none + future := n.updateFuture + if future == nil { + future = NewBatchFuture() + n.updateFuture = future + n.updateTimer = time.AfterFunc(batchUpdateInterval, func() { + // Get the pending updates + n.updatesLock.Lock() + updates := n.updates + future := n.updateFuture + n.updates = nil + n.updateFuture = nil + n.updateTimer = nil + n.updatesLock.Unlock() + + // Perform the batch update + n.batchUpdate(future, updates) + }) + } + n.updatesLock.Unlock() + + // Wait for the future + if err := future.Wait(); err != nil { return err } // Setup the response - reply.Index = index + reply.Index = future.Index() return nil } +// batchUpdate is used to update all the allocations +func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) { + // Prepare the batch update + batch := &structs.AllocUpdateRequest{ + Alloc: updates, + WriteRequest: structs.WriteRequest{Region: n.srv.config.Region}, + } + + // Commit this update via Raft + _, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.client: alloc update failed: %v", err) + } + + // Respond to the future + future.Respond(index, err) +} + // List is used to list the available nodes func (n *Node) List(args *structs.NodeListRequest, reply *structs.NodeListResponse) error { @@ -617,3 +679,35 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6 } return evalIDs, evalIndex, nil } + +// batchFuture is used to wait on a batch update to complete +type batchFuture struct { + doneCh chan struct{} + err error + index uint64 +} + +// NewBatchFuture creates a new batch future +func NewBatchFuture() *batchFuture { + return &batchFuture{ + doneCh: make(chan struct{}), + } +} + +// Wait is used to block for the future to complete and returns the error +func (b *batchFuture) Wait() error { + <-b.doneCh + return b.err +} + +// Index is used to return the index of the batch, only after Wait() +func (b *batchFuture) Index() uint64 { + return b.index +} + +// Respond is used to unblock the future +func (b *batchFuture) Respond(index uint64, err error) { + b.index = index + b.err = err + close(b.doneCh) +} diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c670e93dd..f7886b737 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "reflect" "testing" "time" @@ -817,12 +818,70 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.NodeAllocsResponse + start := time.Now() if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2); err != nil { t.Fatalf("err: %v", err) } if resp2.Index == 0 { t.Fatalf("Bad index: %d", resp2.Index) } + if diff := time.Since(start); diff < batchUpdateInterval { + t.Fatalf("too fast: %v", diff) + } + + // Lookup the alloc + out, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.ClientStatus != structs.AllocClientStatusFailed { + t.Fatalf("Bad: %#v", out) + } +} + +func TestClientEndpoint_BatchUpdate(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Inject fake evaluations + alloc := mock.Alloc() + alloc.NodeID = node.ID + state := s1.fsm.State() + err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Attempt update + clientAlloc := new(structs.Allocation) + *clientAlloc = *alloc + clientAlloc.ClientStatus = structs.AllocClientStatusFailed + + // Call to do the batch update + bf := NewBatchFuture() + endpoint := s1.endpoints.Node + endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc}) + if err := bf.Wait(); err != nil { + t.Fatalf("err: %v", err) + } + if bf.Index() == 0 { + t.Fatalf("Bad index: %d", bf.Index()) + } // Lookup the alloc out, err := state.AllocByID(alloc.ID) @@ -1168,3 +1227,30 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { t.Fatalf("bad: %#v", resp4.Nodes) } } + +func TestBatchFuture(t *testing.T) { + bf := NewBatchFuture() + + // Async respond to the future + expect := fmt.Errorf("testing") + go func() { + time.Sleep(10 * time.Millisecond) + bf.Respond(1000, expect) + }() + + // Block for the result + start := time.Now() + err := bf.Wait() + diff := time.Since(start) + if diff < 5*time.Millisecond { + t.Fatalf("too fast") + } + + // Check the results + if err != expect { + t.Fatalf("bad: %s", err) + } + if bf.Index() != 1000 { + t.Fatalf("bad: %d", bf.Index()) + } +} diff --git a/nomad/server.go b/nomad/server.go index e377ef304..f0a66e688 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -374,7 +374,7 @@ func (s *Server) Leave() error { func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints s.endpoints.Status = &Status{s} - s.endpoints.Node = &Node{s} + s.endpoints.Node = &Node{srv: s} s.endpoints.Job = &Job{s} s.endpoints.Eval = &Eval{s} s.endpoints.Plan = &Plan{s}