nomad: batch client updates for 50msec

This commit is contained in:
Armon Dadgar 2016-02-21 18:51:34 -08:00
parent e8c44448c2
commit 7fc7cd9453
3 changed files with 186 additions and 6 deletions

View file

@ -2,6 +2,7 @@ package nomad
import (
"fmt"
"sync"
"time"
"github.com/armon/go-metrics"
@ -10,9 +11,29 @@ import (
"github.com/hashicorp/nomad/nomad/watch"
)
const (
// batchUpdateInterval is how long we wait to batch updates
batchUpdateInterval = 50 * time.Millisecond
)
// Node endpoint is used for client interactions
type Node struct {
srv *Server
// updates holds pending client status updates for allocations
updates []*structs.Allocation
// updateFuture is used to wait for the pending batch update
// to complete. This may be nil if no batch is pending.
updateFuture *batchFuture
// updateTimer is the timer that will trigger the next batch
// update, and may be nil if there is no batch pending.
updateTimer *time.Timer
// updatesLock synchronizes access to the updates list,
// the future and the timer.
updatesLock sync.Mutex
}
// Register is used to upsert a client that is available for scheduling
@ -456,18 +477,59 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
return fmt.Errorf("must update a single allocation")
}
// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: alloc update failed: %v", err)
// Add this to the batch
n.updatesLock.Lock()
n.updates = append(n.updates, args.Alloc...)
// Start a new batch if none
future := n.updateFuture
if future == nil {
future = NewBatchFuture()
n.updateFuture = future
n.updateTimer = time.AfterFunc(batchUpdateInterval, func() {
// Get the pending updates
n.updatesLock.Lock()
updates := n.updates
future := n.updateFuture
n.updates = nil
n.updateFuture = nil
n.updateTimer = nil
n.updatesLock.Unlock()
// Perform the batch update
n.batchUpdate(future, updates)
})
}
n.updatesLock.Unlock()
// Wait for the future
if err := future.Wait(); err != nil {
return err
}
// Setup the response
reply.Index = index
reply.Index = future.Index()
return nil
}
// batchUpdate is used to update all the allocations
func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) {
// Prepare the batch update
batch := &structs.AllocUpdateRequest{
Alloc: updates,
WriteRequest: structs.WriteRequest{Region: n.srv.config.Region},
}
// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: alloc update failed: %v", err)
}
// Respond to the future
future.Respond(index, err)
}
// List is used to list the available nodes
func (n *Node) List(args *structs.NodeListRequest,
reply *structs.NodeListResponse) error {
@ -617,3 +679,35 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
}
return evalIDs, evalIndex, nil
}
// batchFuture is used to wait on a batch update to complete
type batchFuture struct {
doneCh chan struct{}
err error
index uint64
}
// NewBatchFuture creates a new batch future
func NewBatchFuture() *batchFuture {
return &batchFuture{
doneCh: make(chan struct{}),
}
}
// Wait is used to block for the future to complete and returns the error
func (b *batchFuture) Wait() error {
<-b.doneCh
return b.err
}
// Index is used to return the index of the batch, only after Wait()
func (b *batchFuture) Index() uint64 {
return b.index
}
// Respond is used to unblock the future
func (b *batchFuture) Respond(index uint64, err error) {
b.index = index
b.err = err
close(b.doneCh)
}

View file

@ -1,6 +1,7 @@
package nomad
import (
"fmt"
"reflect"
"testing"
"time"
@ -817,12 +818,70 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeAllocsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
t.Fatalf("Bad index: %d", resp2.Index)
}
if diff := time.Since(start); diff < batchUpdateInterval {
t.Fatalf("too fast: %v", diff)
}
// Lookup the alloc
out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.ClientStatus != structs.AllocClientStatusFailed {
t.Fatalf("Bad: %#v", out)
}
}
func TestClientEndpoint_BatchUpdate(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Inject fake evaluations
alloc := mock.Alloc()
alloc.NodeID = node.ID
state := s1.fsm.State()
err := state.UpsertAllocs(100, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Attempt update
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
clientAlloc.ClientStatus = structs.AllocClientStatusFailed
// Call to do the batch update
bf := NewBatchFuture()
endpoint := s1.endpoints.Node
endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc})
if err := bf.Wait(); err != nil {
t.Fatalf("err: %v", err)
}
if bf.Index() == 0 {
t.Fatalf("Bad index: %d", bf.Index())
}
// Lookup the alloc
out, err := state.AllocByID(alloc.ID)
@ -1168,3 +1227,30 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
t.Fatalf("bad: %#v", resp4.Nodes)
}
}
func TestBatchFuture(t *testing.T) {
bf := NewBatchFuture()
// Async respond to the future
expect := fmt.Errorf("testing")
go func() {
time.Sleep(10 * time.Millisecond)
bf.Respond(1000, expect)
}()
// Block for the result
start := time.Now()
err := bf.Wait()
diff := time.Since(start)
if diff < 5*time.Millisecond {
t.Fatalf("too fast")
}
// Check the results
if err != expect {
t.Fatalf("bad: %s", err)
}
if bf.Index() != 1000 {
t.Fatalf("bad: %d", bf.Index())
}
}

View file

@ -374,7 +374,7 @@ func (s *Server) Leave() error {
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints
s.endpoints.Status = &Status{s}
s.endpoints.Node = &Node{s}
s.endpoints.Node = &Node{srv: s}
s.endpoints.Job = &Job{s}
s.endpoints.Eval = &Eval{s}
s.endpoints.Plan = &Plan{s}